From 4e03cfac2a1b8c4e7afc170b0b88ad31446aa661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Mon, 17 Nov 2025 21:04:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BA=BA=E7=89=A9?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E3=80=81=E8=A1=A8=E8=BE=BE=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E5=92=8C=E8=A1=A8=E6=83=85=E5=8C=85=E7=AE=A1=E7=90=86=20API=20?= =?UTF-8?q?=E8=B7=AF=E7=94=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/webui/emoji_routes.py | 483 +++++++++++++++++++++++++++++++++ src/webui/expression_routes.py | 404 +++++++++++++++++++++++++++ src/webui/person_routes.py | 365 +++++++++++++++++++++++++ src/webui/routes.py | 9 + 4 files changed, 1261 insertions(+) create mode 100644 src/webui/emoji_routes.py create mode 100644 src/webui/expression_routes.py create mode 100644 src/webui/person_routes.py diff --git a/src/webui/emoji_routes.py b/src/webui/emoji_routes.py new file mode 100644 index 00000000..526bc3a9 --- /dev/null +++ b/src/webui/emoji_routes.py @@ -0,0 +1,483 @@ +"""表情包管理 API 路由""" +from fastapi import APIRouter, HTTPException, Header, Query +from pydantic import BaseModel +from typing import Optional, List +from src.common.logger import get_logger +from src.common.database.database_model import Emoji +from .token_manager import get_token_manager +import json +import time + +logger = get_logger("webui.emoji") + +# 创建路由器 +router = APIRouter(prefix="/emoji", tags=["Emoji"]) + + +class EmojiResponse(BaseModel): + """表情包响应""" + id: int + full_path: str + format: str + emoji_hash: str + description: str + query_count: int + is_registered: bool + is_banned: bool + emotion: Optional[List[str]] # 解析后的 JSON + record_time: float + register_time: Optional[float] + usage_count: int + last_used_time: Optional[float] + + +class EmojiListResponse(BaseModel): + """表情包列表响应""" + success: bool + total: int + page: int + page_size: int + data: List[EmojiResponse] + + +class EmojiDetailResponse(BaseModel): + """表情包详情响应""" + success: bool + data: EmojiResponse + + +class EmojiUpdateRequest(BaseModel): + """表情包更新请求""" + description: Optional[str] = None + is_registered: Optional[bool] = None + is_banned: Optional[bool] = None + emotion: Optional[List[str]] = None + + +class EmojiUpdateResponse(BaseModel): + """表情包更新响应""" + success: bool + message: str + data: Optional[EmojiResponse] = None + + +class EmojiDeleteResponse(BaseModel): + """表情包删除响应""" + success: bool + message: str + + +def verify_auth_token(authorization: Optional[str]) -> bool: + """验证认证 Token""" + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="未提供有效的认证信息") + + token = authorization.replace("Bearer ", "") + token_manager = get_token_manager() + + if not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="Token 无效或已过期") + + return True + + +def parse_emotion(emotion_str: Optional[str]) -> Optional[List[str]]: + """解析情感标签 JSON 字符串""" + if not emotion_str: + return None + try: + return json.loads(emotion_str) + except (json.JSONDecodeError, TypeError): + return None + + +def emoji_to_response(emoji: Emoji) -> EmojiResponse: + """将 Emoji 模型转换为响应对象""" + return EmojiResponse( + id=emoji.id, + full_path=emoji.full_path, + format=emoji.format, + emoji_hash=emoji.emoji_hash, + description=emoji.description, + query_count=emoji.query_count, + is_registered=emoji.is_registered, + is_banned=emoji.is_banned, + emotion=parse_emotion(emoji.emotion), + record_time=emoji.record_time, + register_time=emoji.register_time, + usage_count=emoji.usage_count, + last_used_time=emoji.last_used_time, + ) + + +@router.get("/list", response_model=EmojiListResponse) +async def get_emoji_list( + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(20, ge=1, le=100, description="每页数量"), + search: Optional[str] = Query(None, description="搜索关键词"), + is_registered: Optional[bool] = Query(None, description="是否已注册筛选"), + is_banned: Optional[bool] = Query(None, description="是否被禁用筛选"), + format: Optional[str] = Query(None, description="格式筛选"), + authorization: Optional[str] = Header(None) +): + """ + 获取表情包列表 + + Args: + page: 页码 (从 1 开始) + page_size: 每页数量 (1-100) + search: 搜索关键词 (匹配 description, emoji_hash) + is_registered: 是否已注册筛选 + is_banned: 是否被禁用筛选 + format: 格式筛选 + authorization: Authorization header + + Returns: + 表情包列表 + """ + try: + verify_auth_token(authorization) + + # 构建查询 + query = Emoji.select() + + # 搜索过滤 + if search: + query = query.where( + (Emoji.description.contains(search)) | + (Emoji.emoji_hash.contains(search)) + ) + + # 注册状态过滤 + if is_registered is not None: + query = query.where(Emoji.is_registered == is_registered) + + # 禁用状态过滤 + if is_banned is not None: + query = query.where(Emoji.is_banned == is_banned) + + # 格式过滤 + if format: + query = query.where(Emoji.format == format) + + # 排序:使用次数倒序,然后按记录时间倒序 + from peewee import Case + query = query.order_by( + Emoji.usage_count.desc(), + Case(None, [(Emoji.record_time.is_null(), 1)], 0), + Emoji.record_time.desc() + ) + + # 获取总数 + total = query.count() + + # 分页 + offset = (page - 1) * page_size + emojis = query.offset(offset).limit(page_size) + + # 转换为响应对象 + data = [emoji_to_response(emoji) for emoji in emojis] + + return EmojiListResponse( + success=True, + total=total, + page=page, + page_size=page_size, + data=data + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取表情包列表失败: {e}") + raise HTTPException(status_code=500, detail=f"获取表情包列表失败: {str(e)}") from e + + +@router.get("/{emoji_id}", response_model=EmojiDetailResponse) +async def get_emoji_detail( + emoji_id: int, + authorization: Optional[str] = Header(None) +): + """ + 获取表情包详细信息 + + Args: + emoji_id: 表情包ID + authorization: Authorization header + + Returns: + 表情包详细信息 + """ + try: + verify_auth_token(authorization) + + emoji = Emoji.get_or_none(Emoji.id == emoji_id) + + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + + return EmojiDetailResponse( + success=True, + data=emoji_to_response(emoji) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取表情包详情失败: {e}") + raise HTTPException(status_code=500, detail=f"获取表情包详情失败: {str(e)}") from e + + +@router.patch("/{emoji_id}", response_model=EmojiUpdateResponse) +async def update_emoji( + emoji_id: int, + request: EmojiUpdateRequest, + authorization: Optional[str] = Header(None) +): + """ + 增量更新表情包(只更新提供的字段) + + Args: + emoji_id: 表情包ID + request: 更新请求(只包含需要更新的字段) + authorization: Authorization header + + Returns: + 更新结果 + """ + try: + verify_auth_token(authorization) + + emoji = Emoji.get_or_none(Emoji.id == emoji_id) + + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + + # 只更新提供的字段 + update_data = request.model_dump(exclude_unset=True) + + if not update_data: + raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") + + # 处理情感标签(转换为 JSON) + if 'emotion' in update_data: + if update_data['emotion'] is None: + update_data['emotion'] = None + else: + update_data['emotion'] = json.dumps(update_data['emotion'], ensure_ascii=False) + + # 如果注册状态从 False 变为 True,记录注册时间 + if 'is_registered' in update_data and update_data['is_registered'] and not emoji.is_registered: + update_data['register_time'] = time.time() + + # 执行更新 + for field, value in update_data.items(): + setattr(emoji, field, value) + + emoji.save() + + logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}") + + return EmojiUpdateResponse( + success=True, + message=f"成功更新 {len(update_data)} 个字段", + data=emoji_to_response(emoji) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"更新表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"更新表情包失败: {str(e)}") from e + + +@router.delete("/{emoji_id}", response_model=EmojiDeleteResponse) +async def delete_emoji( + emoji_id: int, + authorization: Optional[str] = Header(None) +): + """ + 删除表情包 + + Args: + emoji_id: 表情包ID + authorization: Authorization header + + Returns: + 删除结果 + """ + try: + verify_auth_token(authorization) + + emoji = Emoji.get_or_none(Emoji.id == emoji_id) + + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + + # 记录删除信息 + emoji_hash = emoji.emoji_hash + + # 执行删除 + emoji.delete_instance() + + logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}") + + return EmojiDeleteResponse( + success=True, + message=f"成功删除表情包: {emoji_hash}" + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"删除表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"删除表情包失败: {str(e)}") from e + + +@router.get("/stats/summary") +async def get_emoji_stats( + authorization: Optional[str] = Header(None) +): + """ + 获取表情包统计数据 + + Args: + authorization: Authorization header + + Returns: + 统计数据 + """ + try: + verify_auth_token(authorization) + + total = Emoji.select().count() + registered = Emoji.select().where(Emoji.is_registered).count() + banned = Emoji.select().where(Emoji.is_banned).count() + + # 按格式统计 + formats = {} + for emoji in Emoji.select(Emoji.format): + fmt = emoji.format + formats[fmt] = formats.get(fmt, 0) + 1 + + # 获取最常用的表情包(前10) + top_used = Emoji.select().order_by(Emoji.usage_count.desc()).limit(10) + top_used_list = [ + { + "id": emoji.id, + "emoji_hash": emoji.emoji_hash, + "description": emoji.description, + "usage_count": emoji.usage_count + } + for emoji in top_used + ] + + return { + "success": True, + "data": { + "total": total, + "registered": registered, + "banned": banned, + "unregistered": total - registered, + "formats": formats, + "top_used": top_used_list + } + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取统计数据失败: {e}") + raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e + + +@router.post("/{emoji_id}/register", response_model=EmojiUpdateResponse) +async def register_emoji( + emoji_id: int, + authorization: Optional[str] = Header(None) +): + """ + 注册表情包(快捷操作) + + Args: + emoji_id: 表情包ID + authorization: Authorization header + + Returns: + 更新结果 + """ + try: + verify_auth_token(authorization) + + emoji = Emoji.get_or_none(Emoji.id == emoji_id) + + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + + if emoji.is_registered: + raise HTTPException(status_code=400, detail="该表情包已经注册") + + if emoji.is_banned: + raise HTTPException(status_code=400, detail="该表情包已被禁用,无法注册") + + # 注册表情包 + emoji.is_registered = True + emoji.register_time = time.time() + emoji.save() + + logger.info(f"表情包已注册: ID={emoji_id}") + + return EmojiUpdateResponse( + success=True, + message="表情包注册成功", + data=emoji_to_response(emoji) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"注册表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"注册表情包失败: {str(e)}") from e + + +@router.post("/{emoji_id}/ban", response_model=EmojiUpdateResponse) +async def ban_emoji( + emoji_id: int, + authorization: Optional[str] = Header(None) +): + """ + 禁用表情包(快捷操作) + + Args: + emoji_id: 表情包ID + authorization: Authorization header + + Returns: + 更新结果 + """ + try: + verify_auth_token(authorization) + + emoji = Emoji.get_or_none(Emoji.id == emoji_id) + + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + + # 禁用表情包(同时取消注册) + emoji.is_banned = True + emoji.is_registered = False + emoji.save() + + logger.info(f"表情包已禁用: ID={emoji_id}") + + return EmojiUpdateResponse( + success=True, + message="表情包禁用成功", + data=emoji_to_response(emoji) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"禁用表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"禁用表情包失败: {str(e)}") from e diff --git a/src/webui/expression_routes.py b/src/webui/expression_routes.py new file mode 100644 index 00000000..de2594ee --- /dev/null +++ b/src/webui/expression_routes.py @@ -0,0 +1,404 @@ +"""表达方式管理 API 路由""" +from fastapi import APIRouter, HTTPException, Header, Query +from pydantic import BaseModel +from typing import Optional, List +from src.common.logger import get_logger +from src.common.database.database_model import Expression +from .token_manager import get_token_manager +import time + +logger = get_logger("webui.expression") + +# 创建路由器 +router = APIRouter(prefix="/expression", tags=["Expression"]) + + +class ExpressionResponse(BaseModel): + """表达方式响应""" + id: int + situation: str + style: str + context: Optional[str] + up_content: Optional[str] + last_active_time: float + chat_id: str + create_date: Optional[float] + + +class ExpressionListResponse(BaseModel): + """表达方式列表响应""" + success: bool + total: int + page: int + page_size: int + data: List[ExpressionResponse] + + +class ExpressionDetailResponse(BaseModel): + """表达方式详情响应""" + success: bool + data: ExpressionResponse + + +class ExpressionCreateRequest(BaseModel): + """表达方式创建请求""" + situation: str + style: str + context: Optional[str] = None + up_content: Optional[str] = None + chat_id: str + + +class ExpressionUpdateRequest(BaseModel): + """表达方式更新请求""" + situation: Optional[str] = None + style: Optional[str] = None + context: Optional[str] = None + up_content: Optional[str] = None + chat_id: Optional[str] = None + + +class ExpressionUpdateResponse(BaseModel): + """表达方式更新响应""" + success: bool + message: str + data: Optional[ExpressionResponse] = None + + +class ExpressionDeleteResponse(BaseModel): + """表达方式删除响应""" + success: bool + message: str + + +class ExpressionCreateResponse(BaseModel): + """表达方式创建响应""" + success: bool + message: str + data: ExpressionResponse + + +def verify_auth_token(authorization: Optional[str]) -> bool: + """验证认证 Token""" + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="未提供有效的认证信息") + + token = authorization.replace("Bearer ", "") + token_manager = get_token_manager() + + if not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="Token 无效或已过期") + + return True + + +def expression_to_response(expression: Expression) -> ExpressionResponse: + """将 Expression 模型转换为响应对象""" + return ExpressionResponse( + id=expression.id, + situation=expression.situation, + style=expression.style, + context=expression.context, + up_content=expression.up_content, + last_active_time=expression.last_active_time, + chat_id=expression.chat_id, + create_date=expression.create_date, + ) + + +@router.get("/list", response_model=ExpressionListResponse) +async def get_expression_list( + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(20, ge=1, le=100, description="每页数量"), + search: Optional[str] = Query(None, description="搜索关键词"), + chat_id: Optional[str] = Query(None, description="聊天ID筛选"), + authorization: Optional[str] = Header(None) +): + """ + 获取表达方式列表 + + Args: + page: 页码 (从 1 开始) + page_size: 每页数量 (1-100) + search: 搜索关键词 (匹配 situation, style, context) + chat_id: 聊天ID筛选 + authorization: Authorization header + + Returns: + 表达方式列表 + """ + try: + verify_auth_token(authorization) + + # 构建查询 + query = Expression.select() + + # 搜索过滤 + if search: + query = query.where( + (Expression.situation.contains(search)) | + (Expression.style.contains(search)) | + (Expression.context.contains(search)) + ) + + # 聊天ID过滤 + if chat_id: + query = query.where(Expression.chat_id == chat_id) + + # 排序:最后活跃时间倒序(NULL 值放在最后) + from peewee import Case + query = query.order_by( + Case(None, [(Expression.last_active_time.is_null(), 1)], 0), + Expression.last_active_time.desc() + ) + + # 获取总数 + total = query.count() + + # 分页 + offset = (page - 1) * page_size + expressions = query.offset(offset).limit(page_size) + + # 转换为响应对象 + data = [expression_to_response(expr) for expr in expressions] + + return ExpressionListResponse( + success=True, + total=total, + page=page, + page_size=page_size, + data=data + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取表达方式列表失败: {e}") + raise HTTPException(status_code=500, detail=f"获取表达方式列表失败: {str(e)}") from e + + +@router.get("/{expression_id}", response_model=ExpressionDetailResponse) +async def get_expression_detail( + expression_id: int, + authorization: Optional[str] = Header(None) +): + """ + 获取表达方式详细信息 + + Args: + expression_id: 表达方式ID + authorization: Authorization header + + Returns: + 表达方式详细信息 + """ + try: + verify_auth_token(authorization) + + expression = Expression.get_or_none(Expression.id == expression_id) + + if not expression: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式") + + return ExpressionDetailResponse( + success=True, + data=expression_to_response(expression) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取表达方式详情失败: {e}") + raise HTTPException(status_code=500, detail=f"获取表达方式详情失败: {str(e)}") from e + + +@router.post("/", response_model=ExpressionCreateResponse) +async def create_expression( + request: ExpressionCreateRequest, + authorization: Optional[str] = Header(None) +): + """ + 创建新的表达方式 + + Args: + request: 创建请求 + authorization: Authorization header + + Returns: + 创建结果 + """ + try: + verify_auth_token(authorization) + + current_time = time.time() + + # 创建表达方式 + expression = Expression.create( + situation=request.situation, + style=request.style, + context=request.context, + up_content=request.up_content, + chat_id=request.chat_id, + last_active_time=current_time, + create_date=current_time, + ) + + logger.info(f"表达方式已创建: ID={expression.id}, situation={request.situation}") + + return ExpressionCreateResponse( + success=True, + message="表达方式创建成功", + data=expression_to_response(expression) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"创建表达方式失败: {e}") + raise HTTPException(status_code=500, detail=f"创建表达方式失败: {str(e)}") from e + + +@router.patch("/{expression_id}", response_model=ExpressionUpdateResponse) +async def update_expression( + expression_id: int, + request: ExpressionUpdateRequest, + authorization: Optional[str] = Header(None) +): + """ + 增量更新表达方式(只更新提供的字段) + + Args: + expression_id: 表达方式ID + request: 更新请求(只包含需要更新的字段) + authorization: Authorization header + + Returns: + 更新结果 + """ + try: + verify_auth_token(authorization) + + expression = Expression.get_or_none(Expression.id == expression_id) + + if not expression: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式") + + # 只更新提供的字段 + update_data = request.model_dump(exclude_unset=True) + + if not update_data: + raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") + + # 更新最后活跃时间 + update_data['last_active_time'] = time.time() + + # 执行更新 + for field, value in update_data.items(): + setattr(expression, field, value) + + expression.save() + + logger.info(f"表达方式已更新: ID={expression_id}, 字段: {list(update_data.keys())}") + + return ExpressionUpdateResponse( + success=True, + message=f"成功更新 {len(update_data)} 个字段", + data=expression_to_response(expression) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"更新表达方式失败: {e}") + raise HTTPException(status_code=500, detail=f"更新表达方式失败: {str(e)}") from e + + +@router.delete("/{expression_id}", response_model=ExpressionDeleteResponse) +async def delete_expression( + expression_id: int, + authorization: Optional[str] = Header(None) +): + """ + 删除表达方式 + + Args: + expression_id: 表达方式ID + authorization: Authorization header + + Returns: + 删除结果 + """ + try: + verify_auth_token(authorization) + + expression = Expression.get_or_none(Expression.id == expression_id) + + if not expression: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {expression_id} 的表达方式") + + # 记录删除信息 + situation = expression.situation + + # 执行删除 + expression.delete_instance() + + logger.info(f"表达方式已删除: ID={expression_id}, situation={situation}") + + return ExpressionDeleteResponse( + success=True, + message=f"成功删除表达方式: {situation}" + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"删除表达方式失败: {e}") + raise HTTPException(status_code=500, detail=f"删除表达方式失败: {str(e)}") from e + + +@router.get("/stats/summary") +async def get_expression_stats( + authorization: Optional[str] = Header(None) +): + """ + 获取表达方式统计数据 + + Args: + authorization: Authorization header + + Returns: + 统计数据 + """ + try: + verify_auth_token(authorization) + + total = Expression.select().count() + + # 按 chat_id 统计 + chat_stats = {} + for expr in Expression.select(Expression.chat_id): + chat_id = expr.chat_id + chat_stats[chat_id] = chat_stats.get(chat_id, 0) + 1 + + # 获取最近创建的记录数(7天内) + seven_days_ago = time.time() - (7 * 24 * 60 * 60) + recent = Expression.select().where( + (Expression.create_date.is_null(False)) & + (Expression.create_date >= seven_days_ago) + ).count() + + return { + "success": True, + "data": { + "total": total, + "recent_7days": recent, + "chat_count": len(chat_stats), + "top_chats": dict(sorted(chat_stats.items(), key=lambda x: x[1], reverse=True)[:10]) + } + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取统计数据失败: {e}") + raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e diff --git a/src/webui/person_routes.py b/src/webui/person_routes.py new file mode 100644 index 00000000..a5488d49 --- /dev/null +++ b/src/webui/person_routes.py @@ -0,0 +1,365 @@ +"""人物信息管理 API 路由""" +from fastapi import APIRouter, HTTPException, Header, Query +from pydantic import BaseModel +from typing import Optional, List, Dict +from src.common.logger import get_logger +from src.common.database.database_model import PersonInfo +from .token_manager import get_token_manager +import json +import time + +logger = get_logger("webui.person") + +# 创建路由器 +router = APIRouter(prefix="/person", tags=["Person"]) + + +class PersonInfoResponse(BaseModel): + """人物信息响应""" + id: int + is_known: bool + person_id: str + person_name: Optional[str] + name_reason: Optional[str] + platform: str + user_id: str + nickname: Optional[str] + group_nick_name: Optional[List[Dict[str, str]]] # 解析后的 JSON + memory_points: Optional[str] + know_times: Optional[float] + know_since: Optional[float] + last_know: Optional[float] + + +class PersonListResponse(BaseModel): + """人物列表响应""" + success: bool + total: int + page: int + page_size: int + data: List[PersonInfoResponse] + + +class PersonDetailResponse(BaseModel): + """人物详情响应""" + success: bool + data: PersonInfoResponse + + +class PersonUpdateRequest(BaseModel): + """人物信息更新请求""" + person_name: Optional[str] = None + name_reason: Optional[str] = None + nickname: Optional[str] = None + memory_points: Optional[str] = None + is_known: Optional[bool] = None + + +class PersonUpdateResponse(BaseModel): + """人物信息更新响应""" + success: bool + message: str + data: Optional[PersonInfoResponse] = None + + +class PersonDeleteResponse(BaseModel): + """人物删除响应""" + success: bool + message: str + + +def verify_auth_token(authorization: Optional[str]) -> bool: + """验证认证 Token""" + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="未提供有效的认证信息") + + token = authorization.replace("Bearer ", "") + token_manager = get_token_manager() + + if not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="Token 无效或已过期") + + return True + + +def parse_group_nick_name(group_nick_name_str: Optional[str]) -> Optional[List[Dict[str, str]]]: + """解析群昵称 JSON 字符串""" + if not group_nick_name_str: + return None + try: + return json.loads(group_nick_name_str) + except (json.JSONDecodeError, TypeError): + return None + + +def person_to_response(person: PersonInfo) -> PersonInfoResponse: + """将 PersonInfo 模型转换为响应对象""" + return PersonInfoResponse( + id=person.id, + is_known=person.is_known, + person_id=person.person_id, + person_name=person.person_name, + name_reason=person.name_reason, + platform=person.platform, + user_id=person.user_id, + nickname=person.nickname, + group_nick_name=parse_group_nick_name(person.group_nick_name), + memory_points=person.memory_points, + know_times=person.know_times, + know_since=person.know_since, + last_know=person.last_know, + ) + + +@router.get("/list", response_model=PersonListResponse) +async def get_person_list( + page: int = Query(1, ge=1, description="页码"), + page_size: int = Query(20, ge=1, le=100, description="每页数量"), + search: Optional[str] = Query(None, description="搜索关键词"), + is_known: Optional[bool] = Query(None, description="是否已认识筛选"), + platform: Optional[str] = Query(None, description="平台筛选"), + authorization: Optional[str] = Header(None) +): + """ + 获取人物信息列表 + + Args: + page: 页码 (从 1 开始) + page_size: 每页数量 (1-100) + search: 搜索关键词 (匹配 person_name, nickname, user_id) + is_known: 是否已认识筛选 + platform: 平台筛选 + authorization: Authorization header + + Returns: + 人物信息列表 + """ + try: + verify_auth_token(authorization) + + # 构建查询 + query = PersonInfo.select() + + # 搜索过滤 + if search: + query = query.where( + (PersonInfo.person_name.contains(search)) | + (PersonInfo.nickname.contains(search)) | + (PersonInfo.user_id.contains(search)) + ) + + # 已认识状态过滤 + if is_known is not None: + query = query.where(PersonInfo.is_known == is_known) + + # 平台过滤 + if platform: + query = query.where(PersonInfo.platform == platform) + + # 排序:最后更新时间倒序(NULL 值放在最后) + # Peewee 不支持 nulls_last,使用 CASE WHEN 来实现 + from peewee import Case + query = query.order_by( + Case(None, [(PersonInfo.last_know.is_null(), 1)], 0), + PersonInfo.last_know.desc() + ) + + # 获取总数 + total = query.count() + + # 分页 + offset = (page - 1) * page_size + persons = query.offset(offset).limit(page_size) + + # 转换为响应对象 + data = [person_to_response(person) for person in persons] + + return PersonListResponse( + success=True, + total=total, + page=page, + page_size=page_size, + data=data + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取人物列表失败: {e}") + raise HTTPException(status_code=500, detail=f"获取人物列表失败: {str(e)}") from e + + +@router.get("/{person_id}", response_model=PersonDetailResponse) +async def get_person_detail( + person_id: str, + authorization: Optional[str] = Header(None) +): + """ + 获取人物详细信息 + + Args: + person_id: 人物唯一 ID + authorization: Authorization header + + Returns: + 人物详细信息 + """ + try: + verify_auth_token(authorization) + + person = PersonInfo.get_or_none(PersonInfo.person_id == person_id) + + if not person: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息") + + return PersonDetailResponse( + success=True, + data=person_to_response(person) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取人物详情失败: {e}") + raise HTTPException(status_code=500, detail=f"获取人物详情失败: {str(e)}") from e + + +@router.patch("/{person_id}", response_model=PersonUpdateResponse) +async def update_person( + person_id: str, + request: PersonUpdateRequest, + authorization: Optional[str] = Header(None) +): + """ + 增量更新人物信息(只更新提供的字段) + + Args: + person_id: 人物唯一 ID + request: 更新请求(只包含需要更新的字段) + authorization: Authorization header + + Returns: + 更新结果 + """ + try: + verify_auth_token(authorization) + + person = PersonInfo.get_or_none(PersonInfo.person_id == person_id) + + if not person: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息") + + # 只更新提供的字段 + update_data = request.model_dump(exclude_unset=True) + + if not update_data: + raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") + + # 更新最后修改时间 + update_data['last_know'] = time.time() + + # 执行更新 + for field, value in update_data.items(): + setattr(person, field, value) + + person.save() + + logger.info(f"人物信息已更新: {person_id}, 字段: {list(update_data.keys())}") + + return PersonUpdateResponse( + success=True, + message=f"成功更新 {len(update_data)} 个字段", + data=person_to_response(person) + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"更新人物信息失败: {e}") + raise HTTPException(status_code=500, detail=f"更新人物信息失败: {str(e)}") from e + + +@router.delete("/{person_id}", response_model=PersonDeleteResponse) +async def delete_person( + person_id: str, + authorization: Optional[str] = Header(None) +): + """ + 删除人物信息 + + Args: + person_id: 人物唯一 ID + authorization: Authorization header + + Returns: + 删除结果 + """ + try: + verify_auth_token(authorization) + + person = PersonInfo.get_or_none(PersonInfo.person_id == person_id) + + if not person: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息") + + # 记录删除信息 + person_name = person.person_name or person.nickname or person.user_id + + # 执行删除 + person.delete_instance() + + logger.info(f"人物信息已删除: {person_id} ({person_name})") + + return PersonDeleteResponse( + success=True, + message=f"成功删除人物信息: {person_name}" + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"删除人物信息失败: {e}") + raise HTTPException(status_code=500, detail=f"删除人物信息失败: {str(e)}") from e + + +@router.get("/stats/summary") +async def get_person_stats( + authorization: Optional[str] = Header(None) +): + """ + 获取人物信息统计数据 + + Args: + authorization: Authorization header + + Returns: + 统计数据 + """ + try: + verify_auth_token(authorization) + + total = PersonInfo.select().count() + known = PersonInfo.select().where(PersonInfo.is_known).count() + unknown = total - known + + # 按平台统计 + platforms = {} + for person in PersonInfo.select(PersonInfo.platform): + platform = person.platform + platforms[platform] = platforms.get(platform, 0) + 1 + + return { + "success": True, + "data": { + "total": total, + "known": known, + "unknown": unknown, + "platforms": platforms + } + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取统计数据失败: {e}") + raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e diff --git a/src/webui/routes.py b/src/webui/routes.py index 517b77ed..3b49262d 100644 --- a/src/webui/routes.py +++ b/src/webui/routes.py @@ -6,6 +6,9 @@ from src.common.logger import get_logger from .token_manager import get_token_manager from .config_routes import router as config_router from .statistics_routes import router as statistics_router +from .person_routes import router as person_router +from .expression_routes import router as expression_router +from .emoji_routes import router as emoji_router logger = get_logger("webui.api") @@ -16,6 +19,12 @@ router = APIRouter(prefix="/api/webui", tags=["WebUI"]) router.include_router(config_router) # 注册统计数据路由 router.include_router(statistics_router) +# 注册人物信息管理路由 +router.include_router(person_router) +# 注册表达方式管理路由 +router.include_router(expression_router) +# 注册表情包管理路由 +router.include_router(emoji_router) class TokenVerifyRequest(BaseModel):