feat: 添加人物信息、表达方式和表情包管理 API 路由

pull/1364/head
墨梓柒 2025-11-17 21:04:29 +08:00
parent 4a5ca048ad
commit 4e03cfac2a
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
4 changed files with 1261 additions and 0 deletions

View File

@ -0,0 +1,483 @@
"""表情包管理 API 路由"""
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from typing import Optional, List
from src.common.logger import get_logger
from src.common.database.database_model import Emoji
from .token_manager import get_token_manager
import json
import time
logger = get_logger("webui.emoji")
# 创建路由器
router = APIRouter(prefix="/emoji", tags=["Emoji"])
class EmojiResponse(BaseModel):
"""表情包响应"""
id: int
full_path: str
format: str
emoji_hash: str
description: str
query_count: int
is_registered: bool
is_banned: bool
emotion: Optional[List[str]] # 解析后的 JSON
record_time: float
register_time: Optional[float]
usage_count: int
last_used_time: Optional[float]
class EmojiListResponse(BaseModel):
"""表情包列表响应"""
success: bool
total: int
page: int
page_size: int
data: List[EmojiResponse]
class EmojiDetailResponse(BaseModel):
"""表情包详情响应"""
success: bool
data: EmojiResponse
class EmojiUpdateRequest(BaseModel):
"""表情包更新请求"""
description: Optional[str] = None
is_registered: Optional[bool] = None
is_banned: Optional[bool] = None
emotion: Optional[List[str]] = None
class EmojiUpdateResponse(BaseModel):
"""表情包更新响应"""
success: bool
message: str
data: Optional[EmojiResponse] = None
class EmojiDeleteResponse(BaseModel):
"""表情包删除响应"""
success: bool
message: str
def verify_auth_token(authorization: Optional[str]) -> bool:
"""验证认证 Token"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(token):
raise HTTPException(status_code=401, detail="Token 无效或已过期")
return True
def parse_emotion(emotion_str: Optional[str]) -> Optional[List[str]]:
"""解析情感标签 JSON 字符串"""
if not emotion_str:
return None
try:
return json.loads(emotion_str)
except (json.JSONDecodeError, TypeError):
return None
def emoji_to_response(emoji: Emoji) -> EmojiResponse:
"""将 Emoji 模型转换为响应对象"""
return EmojiResponse(
id=emoji.id,
full_path=emoji.full_path,
format=emoji.format,
emoji_hash=emoji.emoji_hash,
description=emoji.description,
query_count=emoji.query_count,
is_registered=emoji.is_registered,
is_banned=emoji.is_banned,
emotion=parse_emotion(emoji.emotion),
record_time=emoji.record_time,
register_time=emoji.register_time,
usage_count=emoji.usage_count,
last_used_time=emoji.last_used_time,
)
@router.get("/list", response_model=EmojiListResponse)
async def get_emoji_list(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
search: Optional[str] = Query(None, description="搜索关键词"),
is_registered: Optional[bool] = Query(None, description="是否已注册筛选"),
is_banned: Optional[bool] = Query(None, description="是否被禁用筛选"),
format: Optional[str] = Query(None, description="格式筛选"),
authorization: Optional[str] = Header(None)
):
"""
获取表情包列表
Args:
page: 页码 ( 1 开始)
page_size: 每页数量 (1-100)
search: 搜索关键词 (匹配 description, emoji_hash)
is_registered: 是否已注册筛选
is_banned: 是否被禁用筛选
format: 格式筛选
authorization: Authorization header
Returns:
表情包列表
"""
try:
verify_auth_token(authorization)
# 构建查询
query = Emoji.select()
# 搜索过滤
if search:
query = query.where(
(Emoji.description.contains(search)) |
(Emoji.emoji_hash.contains(search))
)
# 注册状态过滤
if is_registered is not None:
query = query.where(Emoji.is_registered == is_registered)
# 禁用状态过滤
if is_banned is not None:
query = query.where(Emoji.is_banned == is_banned)
# 格式过滤
if format:
query = query.where(Emoji.format == format)
# 排序:使用次数倒序,然后按记录时间倒序
from peewee import Case
query = query.order_by(
Emoji.usage_count.desc(),
Case(None, [(Emoji.record_time.is_null(), 1)], 0),
Emoji.record_time.desc()
)
# 获取总数
total = query.count()
# 分页
offset = (page - 1) * page_size
emojis = query.offset(offset).limit(page_size)
# 转换为响应对象
data = [emoji_to_response(emoji) for emoji in emojis]
return EmojiListResponse(
success=True,
total=total,
page=page,
page_size=page_size,
data=data
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取表情包列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取表情包列表失败: {str(e)}") from e
@router.get("/{emoji_id}", response_model=EmojiDetailResponse)
async def get_emoji_detail(
emoji_id: int,
authorization: Optional[str] = Header(None)
):
"""
获取表情包详细信息
Args:
emoji_id: 表情包ID
authorization: Authorization header
Returns:
表情包详细信息
"""
try:
verify_auth_token(authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
return EmojiDetailResponse(
success=True,
data=emoji_to_response(emoji)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取表情包详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取表情包详情失败: {str(e)}") from e
@router.patch("/{emoji_id}", response_model=EmojiUpdateResponse)
async def update_emoji(
emoji_id: int,
request: EmojiUpdateRequest,
authorization: Optional[str] = Header(None)
):
"""
增量更新表情包只更新提供的字段
Args:
emoji_id: 表情包ID
request: 更新请求只包含需要更新的字段
authorization: Authorization header
Returns:
更新结果
"""
try:
verify_auth_token(authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(status_code=400, detail="未提供任何需要更新的字段")
# 处理情感标签(转换为 JSON
if 'emotion' in update_data:
if update_data['emotion'] is None:
update_data['emotion'] = None
else:
update_data['emotion'] = json.dumps(update_data['emotion'], ensure_ascii=False)
# 如果注册状态从 False 变为 True记录注册时间
if 'is_registered' in update_data and update_data['is_registered'] and not emoji.is_registered:
update_data['register_time'] = time.time()
# 执行更新
for field, value in update_data.items():
setattr(emoji, field, value)
emoji.save()
logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}")
return EmojiUpdateResponse(
success=True,
message=f"成功更新 {len(update_data)} 个字段",
data=emoji_to_response(emoji)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"更新表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"更新表情包失败: {str(e)}") from e
@router.delete("/{emoji_id}", response_model=EmojiDeleteResponse)
async def delete_emoji(
emoji_id: int,
authorization: Optional[str] = Header(None)
):
"""
删除表情包
Args:
emoji_id: 表情包ID
authorization: Authorization header
Returns:
删除结果
"""
try:
verify_auth_token(authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 记录删除信息
emoji_hash = emoji.emoji_hash
# 执行删除
emoji.delete_instance()
logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}")
return EmojiDeleteResponse(
success=True,
message=f"成功删除表情包: {emoji_hash}"
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"删除表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"删除表情包失败: {str(e)}") from e
@router.get("/stats/summary")
async def get_emoji_stats(
authorization: Optional[str] = Header(None)
):
"""
获取表情包统计数据
Args:
authorization: Authorization header
Returns:
统计数据
"""
try:
verify_auth_token(authorization)
total = Emoji.select().count()
registered = Emoji.select().where(Emoji.is_registered).count()
banned = Emoji.select().where(Emoji.is_banned).count()
# 按格式统计
formats = {}
for emoji in Emoji.select(Emoji.format):
fmt = emoji.format
formats[fmt] = formats.get(fmt, 0) + 1
# 获取最常用的表情包前10
top_used = Emoji.select().order_by(Emoji.usage_count.desc()).limit(10)
top_used_list = [
{
"id": emoji.id,
"emoji_hash": emoji.emoji_hash,
"description": emoji.description,
"usage_count": emoji.usage_count
}
for emoji in top_used
]
return {
"success": True,
"data": {
"total": total,
"registered": registered,
"banned": banned,
"unregistered": total - registered,
"formats": formats,
"top_used": top_used_list
}
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e
@router.post("/{emoji_id}/register", response_model=EmojiUpdateResponse)
async def register_emoji(
emoji_id: int,
authorization: Optional[str] = Header(None)
):
"""
注册表情包快捷操作
Args:
emoji_id: 表情包ID
authorization: Authorization header
Returns:
更新结果
"""
try:
verify_auth_token(authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if emoji.is_registered:
raise HTTPException(status_code=400, detail="该表情包已经注册")
if emoji.is_banned:
raise HTTPException(status_code=400, detail="该表情包已被禁用,无法注册")
# 注册表情包
emoji.is_registered = True
emoji.register_time = time.time()
emoji.save()
logger.info(f"表情包已注册: ID={emoji_id}")
return EmojiUpdateResponse(
success=True,
message="表情包注册成功",
data=emoji_to_response(emoji)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"注册表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"注册表情包失败: {str(e)}") from e
@router.post("/{emoji_id}/ban", response_model=EmojiUpdateResponse)
async def ban_emoji(
emoji_id: int,
authorization: Optional[str] = Header(None)
):
"""
禁用表情包快捷操作
Args:
emoji_id: 表情包ID
authorization: Authorization header
Returns:
更新结果
"""
try:
verify_auth_token(authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 禁用表情包(同时取消注册)
emoji.is_banned = True
emoji.is_registered = False
emoji.save()
logger.info(f"表情包已禁用: ID={emoji_id}")
return EmojiUpdateResponse(
success=True,
message="表情包禁用成功",
data=emoji_to_response(emoji)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"禁用表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"禁用表情包失败: {str(e)}") from e

View File

@ -0,0 +1,404 @@
"""表达方式管理 API 路由"""
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from typing import Optional, List
from src.common.logger import get_logger
from src.common.database.database_model import Expression
from .token_manager import get_token_manager
import time
logger = get_logger("webui.expression")
# 创建路由器
router = APIRouter(prefix="/expression", tags=["Expression"])
class ExpressionResponse(BaseModel):
"""表达方式响应"""
id: int
situation: str
style: str
context: Optional[str]
up_content: Optional[str]
last_active_time: float
chat_id: str
create_date: Optional[float]
class ExpressionListResponse(BaseModel):
"""表达方式列表响应"""
success: bool
total: int
page: int
page_size: int
data: List[ExpressionResponse]
class ExpressionDetailResponse(BaseModel):
"""表达方式详情响应"""
success: bool
data: ExpressionResponse
class ExpressionCreateRequest(BaseModel):
"""表达方式创建请求"""
situation: str
style: str
context: Optional[str] = None
up_content: Optional[str] = None
chat_id: str
class ExpressionUpdateRequest(BaseModel):
"""表达方式更新请求"""
situation: Optional[str] = None
style: Optional[str] = None
context: Optional[str] = None
up_content: Optional[str] = None
chat_id: Optional[str] = None
class ExpressionUpdateResponse(BaseModel):
"""表达方式更新响应"""
success: bool
message: str
data: Optional[ExpressionResponse] = None
class ExpressionDeleteResponse(BaseModel):
"""表达方式删除响应"""
success: bool
message: str
class ExpressionCreateResponse(BaseModel):
"""表达方式创建响应"""
success: bool
message: str
data: ExpressionResponse
def verify_auth_token(authorization: Optional[str]) -> bool:
"""验证认证 Token"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(token):
raise HTTPException(status_code=401, detail="Token 无效或已过期")
return True
def expression_to_response(expression: Expression) -> ExpressionResponse:
"""将 Expression 模型转换为响应对象"""
return ExpressionResponse(
id=expression.id,
situation=expression.situation,
style=expression.style,
context=expression.context,
up_content=expression.up_content,
last_active_time=expression.last_active_time,
chat_id=expression.chat_id,
create_date=expression.create_date,
)
@router.get("/list", response_model=ExpressionListResponse)
async def get_expression_list(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
search: Optional[str] = Query(None, description="搜索关键词"),
chat_id: Optional[str] = Query(None, description="聊天ID筛选"),
authorization: Optional[str] = Header(None)
):
"""
获取表达方式列表
Args:
page: 页码 ( 1 开始)
page_size: 每页数量 (1-100)
search: 搜索关键词 (匹配 situation, style, context)
chat_id: 聊天ID筛选
authorization: Authorization header
Returns:
表达方式列表
"""
try:
verify_auth_token(authorization)
# 构建查询
query = Expression.select()
# 搜索过滤
if search:
query = query.where(
(Expression.situation.contains(search)) |
(Expression.style.contains(search)) |
(Expression.context.contains(search))
)
# 聊天ID过滤
if chat_id:
query = query.where(Expression.chat_id == chat_id)
# 排序最后活跃时间倒序NULL 值放在最后)
from peewee import Case
query = query.order_by(
Case(None, [(Expression.last_active_time.is_null(), 1)], 0),
Expression.last_active_time.desc()
)
# 获取总数
total = query.count()
# 分页
offset = (page - 1) * page_size
expressions = query.offset(offset).limit(page_size)
# 转换为响应对象
data = [expression_to_response(expr) for expr in expressions]
return ExpressionListResponse(
success=True,
total=total,
page=page,
page_size=page_size,
data=data
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取表达方式列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取表达方式列表失败: {str(e)}") from e
@router.get("/{expression_id}", response_model=ExpressionDetailResponse)
async def get_expression_detail(
expression_id: int,
authorization: Optional[str] = Header(None)
):
"""
获取表达方式详细信息
Args:
expression_id: 表达方式ID
authorization: Authorization header
Returns:
表达方式详细信息
"""
try:
verify_auth_token(authorization)
expression = Expression.get_or_none(Expression.id == expression_id)
if not expression:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式")
return ExpressionDetailResponse(
success=True,
data=expression_to_response(expression)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取表达方式详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取表达方式详情失败: {str(e)}") from e
@router.post("/", response_model=ExpressionCreateResponse)
async def create_expression(
request: ExpressionCreateRequest,
authorization: Optional[str] = Header(None)
):
"""
创建新的表达方式
Args:
request: 创建请求
authorization: Authorization header
Returns:
创建结果
"""
try:
verify_auth_token(authorization)
current_time = time.time()
# 创建表达方式
expression = Expression.create(
situation=request.situation,
style=request.style,
context=request.context,
up_content=request.up_content,
chat_id=request.chat_id,
last_active_time=current_time,
create_date=current_time,
)
logger.info(f"表达方式已创建: ID={expression.id}, situation={request.situation}")
return ExpressionCreateResponse(
success=True,
message="表达方式创建成功",
data=expression_to_response(expression)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"创建表达方式失败: {e}")
raise HTTPException(status_code=500, detail=f"创建表达方式失败: {str(e)}") from e
@router.patch("/{expression_id}", response_model=ExpressionUpdateResponse)
async def update_expression(
expression_id: int,
request: ExpressionUpdateRequest,
authorization: Optional[str] = Header(None)
):
"""
增量更新表达方式只更新提供的字段
Args:
expression_id: 表达方式ID
request: 更新请求只包含需要更新的字段
authorization: Authorization header
Returns:
更新结果
"""
try:
verify_auth_token(authorization)
expression = Expression.get_or_none(Expression.id == expression_id)
if not expression:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式")
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(status_code=400, detail="未提供任何需要更新的字段")
# 更新最后活跃时间
update_data['last_active_time'] = time.time()
# 执行更新
for field, value in update_data.items():
setattr(expression, field, value)
expression.save()
logger.info(f"表达方式已更新: ID={expression_id}, 字段: {list(update_data.keys())}")
return ExpressionUpdateResponse(
success=True,
message=f"成功更新 {len(update_data)} 个字段",
data=expression_to_response(expression)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"更新表达方式失败: {e}")
raise HTTPException(status_code=500, detail=f"更新表达方式失败: {str(e)}") from e
@router.delete("/{expression_id}", response_model=ExpressionDeleteResponse)
async def delete_expression(
expression_id: int,
authorization: Optional[str] = Header(None)
):
"""
删除表达方式
Args:
expression_id: 表达方式ID
authorization: Authorization header
Returns:
删除结果
"""
try:
verify_auth_token(authorization)
expression = Expression.get_or_none(Expression.id == expression_id)
if not expression:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式")
# 记录删除信息
situation = expression.situation
# 执行删除
expression.delete_instance()
logger.info(f"表达方式已删除: ID={expression_id}, situation={situation}")
return ExpressionDeleteResponse(
success=True,
message=f"成功删除表达方式: {situation}"
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"删除表达方式失败: {e}")
raise HTTPException(status_code=500, detail=f"删除表达方式失败: {str(e)}") from e
@router.get("/stats/summary")
async def get_expression_stats(
authorization: Optional[str] = Header(None)
):
"""
获取表达方式统计数据
Args:
authorization: Authorization header
Returns:
统计数据
"""
try:
verify_auth_token(authorization)
total = Expression.select().count()
# 按 chat_id 统计
chat_stats = {}
for expr in Expression.select(Expression.chat_id):
chat_id = expr.chat_id
chat_stats[chat_id] = chat_stats.get(chat_id, 0) + 1
# 获取最近创建的记录数7天内
seven_days_ago = time.time() - (7 * 24 * 60 * 60)
recent = Expression.select().where(
(Expression.create_date.is_null(False)) &
(Expression.create_date >= seven_days_ago)
).count()
return {
"success": True,
"data": {
"total": total,
"recent_7days": recent,
"chat_count": len(chat_stats),
"top_chats": dict(sorted(chat_stats.items(), key=lambda x: x[1], reverse=True)[:10])
}
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e

View File

@ -0,0 +1,365 @@
"""人物信息管理 API 路由"""
from fastapi import APIRouter, HTTPException, Header, Query
from pydantic import BaseModel
from typing import Optional, List, Dict
from src.common.logger import get_logger
from src.common.database.database_model import PersonInfo
from .token_manager import get_token_manager
import json
import time
logger = get_logger("webui.person")
# 创建路由器
router = APIRouter(prefix="/person", tags=["Person"])
class PersonInfoResponse(BaseModel):
"""人物信息响应"""
id: int
is_known: bool
person_id: str
person_name: Optional[str]
name_reason: Optional[str]
platform: str
user_id: str
nickname: Optional[str]
group_nick_name: Optional[List[Dict[str, str]]] # 解析后的 JSON
memory_points: Optional[str]
know_times: Optional[float]
know_since: Optional[float]
last_know: Optional[float]
class PersonListResponse(BaseModel):
"""人物列表响应"""
success: bool
total: int
page: int
page_size: int
data: List[PersonInfoResponse]
class PersonDetailResponse(BaseModel):
"""人物详情响应"""
success: bool
data: PersonInfoResponse
class PersonUpdateRequest(BaseModel):
"""人物信息更新请求"""
person_name: Optional[str] = None
name_reason: Optional[str] = None
nickname: Optional[str] = None
memory_points: Optional[str] = None
is_known: Optional[bool] = None
class PersonUpdateResponse(BaseModel):
"""人物信息更新响应"""
success: bool
message: str
data: Optional[PersonInfoResponse] = None
class PersonDeleteResponse(BaseModel):
"""人物删除响应"""
success: bool
message: str
def verify_auth_token(authorization: Optional[str]) -> bool:
"""验证认证 Token"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(token):
raise HTTPException(status_code=401, detail="Token 无效或已过期")
return True
def parse_group_nick_name(group_nick_name_str: Optional[str]) -> Optional[List[Dict[str, str]]]:
"""解析群昵称 JSON 字符串"""
if not group_nick_name_str:
return None
try:
return json.loads(group_nick_name_str)
except (json.JSONDecodeError, TypeError):
return None
def person_to_response(person: PersonInfo) -> PersonInfoResponse:
"""将 PersonInfo 模型转换为响应对象"""
return PersonInfoResponse(
id=person.id,
is_known=person.is_known,
person_id=person.person_id,
person_name=person.person_name,
name_reason=person.name_reason,
platform=person.platform,
user_id=person.user_id,
nickname=person.nickname,
group_nick_name=parse_group_nick_name(person.group_nick_name),
memory_points=person.memory_points,
know_times=person.know_times,
know_since=person.know_since,
last_know=person.last_know,
)
@router.get("/list", response_model=PersonListResponse)
async def get_person_list(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
search: Optional[str] = Query(None, description="搜索关键词"),
is_known: Optional[bool] = Query(None, description="是否已认识筛选"),
platform: Optional[str] = Query(None, description="平台筛选"),
authorization: Optional[str] = Header(None)
):
"""
获取人物信息列表
Args:
page: 页码 ( 1 开始)
page_size: 每页数量 (1-100)
search: 搜索关键词 (匹配 person_name, nickname, user_id)
is_known: 是否已认识筛选
platform: 平台筛选
authorization: Authorization header
Returns:
人物信息列表
"""
try:
verify_auth_token(authorization)
# 构建查询
query = PersonInfo.select()
# 搜索过滤
if search:
query = query.where(
(PersonInfo.person_name.contains(search)) |
(PersonInfo.nickname.contains(search)) |
(PersonInfo.user_id.contains(search))
)
# 已认识状态过滤
if is_known is not None:
query = query.where(PersonInfo.is_known == is_known)
# 平台过滤
if platform:
query = query.where(PersonInfo.platform == platform)
# 排序最后更新时间倒序NULL 值放在最后)
# Peewee 不支持 nulls_last使用 CASE WHEN 来实现
from peewee import Case
query = query.order_by(
Case(None, [(PersonInfo.last_know.is_null(), 1)], 0),
PersonInfo.last_know.desc()
)
# 获取总数
total = query.count()
# 分页
offset = (page - 1) * page_size
persons = query.offset(offset).limit(page_size)
# 转换为响应对象
data = [person_to_response(person) for person in persons]
return PersonListResponse(
success=True,
total=total,
page=page,
page_size=page_size,
data=data
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取人物列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取人物列表失败: {str(e)}") from e
@router.get("/{person_id}", response_model=PersonDetailResponse)
async def get_person_detail(
person_id: str,
authorization: Optional[str] = Header(None)
):
"""
获取人物详细信息
Args:
person_id: 人物唯一 ID
authorization: Authorization header
Returns:
人物详细信息
"""
try:
verify_auth_token(authorization)
person = PersonInfo.get_or_none(PersonInfo.person_id == person_id)
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
return PersonDetailResponse(
success=True,
data=person_to_response(person)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取人物详情失败: {e}")
raise HTTPException(status_code=500, detail=f"获取人物详情失败: {str(e)}") from e
@router.patch("/{person_id}", response_model=PersonUpdateResponse)
async def update_person(
person_id: str,
request: PersonUpdateRequest,
authorization: Optional[str] = Header(None)
):
"""
增量更新人物信息只更新提供的字段
Args:
person_id: 人物唯一 ID
request: 更新请求只包含需要更新的字段
authorization: Authorization header
Returns:
更新结果
"""
try:
verify_auth_token(authorization)
person = PersonInfo.get_or_none(PersonInfo.person_id == person_id)
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(status_code=400, detail="未提供任何需要更新的字段")
# 更新最后修改时间
update_data['last_know'] = time.time()
# 执行更新
for field, value in update_data.items():
setattr(person, field, value)
person.save()
logger.info(f"人物信息已更新: {person_id}, 字段: {list(update_data.keys())}")
return PersonUpdateResponse(
success=True,
message=f"成功更新 {len(update_data)} 个字段",
data=person_to_response(person)
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"更新人物信息失败: {e}")
raise HTTPException(status_code=500, detail=f"更新人物信息失败: {str(e)}") from e
@router.delete("/{person_id}", response_model=PersonDeleteResponse)
async def delete_person(
person_id: str,
authorization: Optional[str] = Header(None)
):
"""
删除人物信息
Args:
person_id: 人物唯一 ID
authorization: Authorization header
Returns:
删除结果
"""
try:
verify_auth_token(authorization)
person = PersonInfo.get_or_none(PersonInfo.person_id == person_id)
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
# 记录删除信息
person_name = person.person_name or person.nickname or person.user_id
# 执行删除
person.delete_instance()
logger.info(f"人物信息已删除: {person_id} ({person_name})")
return PersonDeleteResponse(
success=True,
message=f"成功删除人物信息: {person_name}"
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"删除人物信息失败: {e}")
raise HTTPException(status_code=500, detail=f"删除人物信息失败: {str(e)}") from e
@router.get("/stats/summary")
async def get_person_stats(
authorization: Optional[str] = Header(None)
):
"""
获取人物信息统计数据
Args:
authorization: Authorization header
Returns:
统计数据
"""
try:
verify_auth_token(authorization)
total = PersonInfo.select().count()
known = PersonInfo.select().where(PersonInfo.is_known).count()
unknown = total - known
# 按平台统计
platforms = {}
for person in PersonInfo.select(PersonInfo.platform):
platform = person.platform
platforms[platform] = platforms.get(platform, 0) + 1
return {
"success": True,
"data": {
"total": total,
"known": known,
"unknown": unknown,
"platforms": platforms
}
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e

View File

@ -6,6 +6,9 @@ from src.common.logger import get_logger
from .token_manager import get_token_manager
from .config_routes import router as config_router
from .statistics_routes import router as statistics_router
from .person_routes import router as person_router
from .expression_routes import router as expression_router
from .emoji_routes import router as emoji_router
logger = get_logger("webui.api")
@ -16,6 +19,12 @@ router = APIRouter(prefix="/api/webui", tags=["WebUI"])
router.include_router(config_router)
# 注册统计数据路由
router.include_router(statistics_router)
# 注册人物信息管理路由
router.include_router(person_router)
# 注册表达方式管理路由
router.include_router(expression_router)
# 注册表情包管理路由
router.include_router(emoji_router)
class TokenVerifyRequest(BaseModel):