"""表情包管理 API 路由""" from fastapi import APIRouter, HTTPException, Header, Query from pydantic import BaseModel from typing import Optional, List from src.common.logger import get_logger from src.common.database.database_model import Emoji from .token_manager import get_token_manager import json import time logger = get_logger("webui.emoji") # 创建路由器 router = APIRouter(prefix="/emoji", tags=["Emoji"]) class EmojiResponse(BaseModel): """表情包响应""" id: int full_path: str format: str emoji_hash: str description: str query_count: int is_registered: bool is_banned: bool emotion: Optional[List[str]] # 解析后的 JSON record_time: float register_time: Optional[float] usage_count: int last_used_time: Optional[float] class EmojiListResponse(BaseModel): """表情包列表响应""" success: bool total: int page: int page_size: int data: List[EmojiResponse] class EmojiDetailResponse(BaseModel): """表情包详情响应""" success: bool data: EmojiResponse class EmojiUpdateRequest(BaseModel): """表情包更新请求""" description: Optional[str] = None is_registered: Optional[bool] = None is_banned: Optional[bool] = None emotion: Optional[List[str]] = None class EmojiUpdateResponse(BaseModel): """表情包更新响应""" success: bool message: str data: Optional[EmojiResponse] = None class EmojiDeleteResponse(BaseModel): """表情包删除响应""" success: bool message: str def verify_auth_token(authorization: Optional[str]) -> bool: """验证认证 Token""" if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="未提供有效的认证信息") token = authorization.replace("Bearer ", "") token_manager = get_token_manager() if not token_manager.verify_token(token): raise HTTPException(status_code=401, detail="Token 无效或已过期") return True def parse_emotion(emotion_str: Optional[str]) -> Optional[List[str]]: """解析情感标签 JSON 字符串""" if not emotion_str: return None try: return json.loads(emotion_str) except (json.JSONDecodeError, TypeError): return None def emoji_to_response(emoji: Emoji) -> EmojiResponse: """将 Emoji 模型转换为响应对象""" return EmojiResponse( id=emoji.id, full_path=emoji.full_path, format=emoji.format, emoji_hash=emoji.emoji_hash, description=emoji.description, query_count=emoji.query_count, is_registered=emoji.is_registered, is_banned=emoji.is_banned, emotion=parse_emotion(emoji.emotion), record_time=emoji.record_time, register_time=emoji.register_time, usage_count=emoji.usage_count, last_used_time=emoji.last_used_time, ) @router.get("/list", response_model=EmojiListResponse) async def get_emoji_list( page: int = Query(1, ge=1, description="页码"), page_size: int = Query(20, ge=1, le=100, description="每页数量"), search: Optional[str] = Query(None, description="搜索关键词"), is_registered: Optional[bool] = Query(None, description="是否已注册筛选"), is_banned: Optional[bool] = Query(None, description="是否被禁用筛选"), format: Optional[str] = Query(None, description="格式筛选"), authorization: Optional[str] = Header(None) ): """ 获取表情包列表 Args: page: 页码 (从 1 开始) page_size: 每页数量 (1-100) search: 搜索关键词 (匹配 description, emoji_hash) is_registered: 是否已注册筛选 is_banned: 是否被禁用筛选 format: 格式筛选 authorization: Authorization header Returns: 表情包列表 """ try: verify_auth_token(authorization) # 构建查询 query = Emoji.select() # 搜索过滤 if search: query = query.where( (Emoji.description.contains(search)) | (Emoji.emoji_hash.contains(search)) ) # 注册状态过滤 if is_registered is not None: query = query.where(Emoji.is_registered == is_registered) # 禁用状态过滤 if is_banned is not None: query = query.where(Emoji.is_banned == is_banned) # 格式过滤 if format: query = query.where(Emoji.format == format) # 排序:使用次数倒序,然后按记录时间倒序 from peewee import Case query = query.order_by( Emoji.usage_count.desc(), Case(None, [(Emoji.record_time.is_null(), 1)], 0), Emoji.record_time.desc() ) # 获取总数 total = query.count() # 分页 offset = (page - 1) * page_size emojis = query.offset(offset).limit(page_size) # 转换为响应对象 data = [emoji_to_response(emoji) for emoji in emojis] return EmojiListResponse( success=True, total=total, page=page, page_size=page_size, data=data ) except HTTPException: raise except Exception as e: logger.exception(f"获取表情包列表失败: {e}") raise HTTPException(status_code=500, detail=f"获取表情包列表失败: {str(e)}") from e @router.get("/{emoji_id}", response_model=EmojiDetailResponse) async def get_emoji_detail( emoji_id: int, authorization: Optional[str] = Header(None) ): """ 获取表情包详细信息 Args: emoji_id: 表情包ID authorization: Authorization header Returns: 表情包详细信息 """ try: verify_auth_token(authorization) emoji = Emoji.get_or_none(Emoji.id == emoji_id) if not emoji: raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") return EmojiDetailResponse( success=True, data=emoji_to_response(emoji) ) except HTTPException: raise except Exception as e: logger.exception(f"获取表情包详情失败: {e}") raise HTTPException(status_code=500, detail=f"获取表情包详情失败: {str(e)}") from e @router.patch("/{emoji_id}", response_model=EmojiUpdateResponse) async def update_emoji( emoji_id: int, request: EmojiUpdateRequest, authorization: Optional[str] = Header(None) ): """ 增量更新表情包(只更新提供的字段) Args: emoji_id: 表情包ID request: 更新请求(只包含需要更新的字段) authorization: Authorization header Returns: 更新结果 """ try: verify_auth_token(authorization) emoji = Emoji.get_or_none(Emoji.id == emoji_id) if not emoji: raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") # 只更新提供的字段 update_data = request.model_dump(exclude_unset=True) if not update_data: raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") # 处理情感标签(转换为 JSON) if 'emotion' in update_data: if update_data['emotion'] is None: update_data['emotion'] = None else: update_data['emotion'] = json.dumps(update_data['emotion'], ensure_ascii=False) # 如果注册状态从 False 变为 True,记录注册时间 if 'is_registered' in update_data and update_data['is_registered'] and not emoji.is_registered: update_data['register_time'] = time.time() # 执行更新 for field, value in update_data.items(): setattr(emoji, field, value) emoji.save() logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}") return EmojiUpdateResponse( success=True, message=f"成功更新 {len(update_data)} 个字段", data=emoji_to_response(emoji) ) except HTTPException: raise except Exception as e: logger.exception(f"更新表情包失败: {e}") raise HTTPException(status_code=500, detail=f"更新表情包失败: {str(e)}") from e @router.delete("/{emoji_id}", response_model=EmojiDeleteResponse) async def delete_emoji( emoji_id: int, authorization: Optional[str] = Header(None) ): """ 删除表情包 Args: emoji_id: 表情包ID authorization: Authorization header Returns: 删除结果 """ try: verify_auth_token(authorization) emoji = Emoji.get_or_none(Emoji.id == emoji_id) if not emoji: raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") # 记录删除信息 emoji_hash = emoji.emoji_hash # 执行删除 emoji.delete_instance() logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}") return EmojiDeleteResponse( success=True, message=f"成功删除表情包: {emoji_hash}" ) except HTTPException: raise except Exception as e: logger.exception(f"删除表情包失败: {e}") raise HTTPException(status_code=500, detail=f"删除表情包失败: {str(e)}") from e @router.get("/stats/summary") async def get_emoji_stats( authorization: Optional[str] = Header(None) ): """ 获取表情包统计数据 Args: authorization: Authorization header Returns: 统计数据 """ try: verify_auth_token(authorization) total = Emoji.select().count() registered = Emoji.select().where(Emoji.is_registered).count() banned = Emoji.select().where(Emoji.is_banned).count() # 按格式统计 formats = {} for emoji in Emoji.select(Emoji.format): fmt = emoji.format formats[fmt] = formats.get(fmt, 0) + 1 # 获取最常用的表情包(前10) top_used = Emoji.select().order_by(Emoji.usage_count.desc()).limit(10) top_used_list = [ { "id": emoji.id, "emoji_hash": emoji.emoji_hash, "description": emoji.description, "usage_count": emoji.usage_count } for emoji in top_used ] return { "success": True, "data": { "total": total, "registered": registered, "banned": banned, "unregistered": total - registered, "formats": formats, "top_used": top_used_list } } except HTTPException: raise except Exception as e: logger.exception(f"获取统计数据失败: {e}") raise HTTPException(status_code=500, detail=f"获取统计数据失败: {str(e)}") from e @router.post("/{emoji_id}/register", response_model=EmojiUpdateResponse) async def register_emoji( emoji_id: int, authorization: Optional[str] = Header(None) ): """ 注册表情包(快捷操作) Args: emoji_id: 表情包ID authorization: Authorization header Returns: 更新结果 """ try: verify_auth_token(authorization) emoji = Emoji.get_or_none(Emoji.id == emoji_id) if not emoji: raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") if emoji.is_registered: raise HTTPException(status_code=400, detail="该表情包已经注册") if emoji.is_banned: raise HTTPException(status_code=400, detail="该表情包已被禁用,无法注册") # 注册表情包 emoji.is_registered = True emoji.register_time = time.time() emoji.save() logger.info(f"表情包已注册: ID={emoji_id}") return EmojiUpdateResponse( success=True, message="表情包注册成功", data=emoji_to_response(emoji) ) except HTTPException: raise except Exception as e: logger.exception(f"注册表情包失败: {e}") raise HTTPException(status_code=500, detail=f"注册表情包失败: {str(e)}") from e @router.post("/{emoji_id}/ban", response_model=EmojiUpdateResponse) async def ban_emoji( emoji_id: int, authorization: Optional[str] = Header(None) ): """ 禁用表情包(快捷操作) Args: emoji_id: 表情包ID authorization: Authorization header Returns: 更新结果 """ try: verify_auth_token(authorization) emoji = Emoji.get_or_none(Emoji.id == emoji_id) if not emoji: raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") # 禁用表情包(同时取消注册) emoji.is_banned = True emoji.is_registered = False emoji.save() logger.info(f"表情包已禁用: ID={emoji_id}") return EmojiUpdateResponse( success=True, message="表情包禁用成功", data=emoji_to_response(emoji) ) except HTTPException: raise except Exception as e: logger.exception(f"禁用表情包失败: {e}") raise HTTPException(status_code=500, detail=f"禁用表情包失败: {str(e)}") from e