diff --git a/src/webui/emoji_routes.py b/src/webui/emoji_routes.py index 35eaed04..e2aa6875 100644 --- a/src/webui/emoji_routes.py +++ b/src/webui/emoji_routes.py @@ -1,6 +1,6 @@ """表情包管理 API 路由""" -from fastapi import APIRouter, HTTPException, Header, Query +from fastapi import APIRouter, HTTPException, Header, Query, UploadFile, File, Form from fastapi.responses import FileResponse from pydantic import BaseModel from typing import Optional, List @@ -10,6 +10,10 @@ from .token_manager import get_token_manager import json import time import os +import hashlib +import base64 +from PIL import Image +import io logger = get_logger("webui.emoji") @@ -572,3 +576,278 @@ async def batch_delete_emojis(request: BatchDeleteRequest, authorization: Option except Exception as e: logger.exception(f"批量删除表情包失败: {e}") raise HTTPException(status_code=500, detail=f"批量删除失败: {str(e)}") from e + + +# 表情包存储目录 +EMOJI_REGISTERED_DIR = os.path.join("data", "emoji_registed") + + +class EmojiUploadResponse(BaseModel): + """表情包上传响应""" + + success: bool + message: str + data: Optional[EmojiResponse] = None + + +@router.post("/upload", response_model=EmojiUploadResponse) +async def upload_emoji( + file: UploadFile = File(..., description="表情包图片文件"), + description: str = Form("", description="表情包描述"), + emotion: str = Form("", description="情感标签,多个用逗号分隔"), + is_registered: bool = Form(True, description="是否直接注册"), + authorization: Optional[str] = Header(None), +): + """ + 上传并注册表情包 + + Args: + file: 表情包图片文件 (支持 jpg, jpeg, png, gif, webp) + description: 表情包描述 + emotion: 情感标签,多个用逗号分隔 + is_registered: 是否直接注册,默认为 True + authorization: Authorization header + + Returns: + 上传结果和表情包信息 + """ + try: + verify_auth_token(authorization) + + # 验证文件类型 + if not file.content_type: + raise HTTPException(status_code=400, detail="无法识别文件类型") + + allowed_types = ["image/jpeg", "image/png", "image/gif", "image/webp"] + if file.content_type not in allowed_types: + raise HTTPException( + status_code=400, + detail=f"不支持的文件类型: {file.content_type},支持: {', '.join(allowed_types)}", + ) + + # 读取文件内容 + file_content = await file.read() + + if not file_content: + raise HTTPException(status_code=400, detail="文件内容为空") + + # 验证图片并获取格式 + try: + with Image.open(io.BytesIO(file_content)) as img: + img_format = img.format.lower() if img.format else "png" + # 验证图片可以正常打开 + img.verify() + except Exception as e: + raise HTTPException(status_code=400, detail=f"无效的图片文件: {str(e)}") from e + + # 重新打开图片(verify后需要重新打开) + with Image.open(io.BytesIO(file_content)) as img: + img_format = img.format.lower() if img.format else "png" + + # 计算文件哈希 + emoji_hash = hashlib.md5(file_content).hexdigest() + + # 检查是否已存在相同哈希的表情包 + existing_emoji = Emoji.get_or_none(Emoji.emoji_hash == emoji_hash) + if existing_emoji: + raise HTTPException( + status_code=409, + detail=f"已存在相同的表情包 (ID: {existing_emoji.id})", + ) + + # 确保目录存在 + os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True) + + # 生成文件名 + timestamp = int(time.time()) + filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}" + full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) + + # 如果文件已存在,添加随机后缀 + counter = 1 + while os.path.exists(full_path): + filename = f"emoji_{timestamp}_{emoji_hash[:8]}_{counter}.{img_format}" + full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) + counter += 1 + + # 保存文件 + with open(full_path, "wb") as f: + f.write(file_content) + + logger.info(f"表情包文件已保存: {full_path}") + + # 处理情感标签 + emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else "" + + # 创建数据库记录 + current_time = time.time() + emoji = Emoji.create( + full_path=full_path, + format=img_format, + emoji_hash=emoji_hash, + description=description, + emotion=emotion_str, + query_count=0, + is_registered=is_registered, + is_banned=False, + record_time=current_time, + register_time=current_time if is_registered else None, + usage_count=0, + last_used_time=None, + ) + + logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}") + + return EmojiUploadResponse( + success=True, + message="表情包上传成功" + ("并已注册" if is_registered else ""), + data=emoji_to_response(emoji), + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"上传表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}") from e + + +@router.post("/batch/upload") +async def batch_upload_emoji( + files: List[UploadFile] = File(..., description="多个表情包图片文件"), + emotion: str = Form("", description="情感标签,多个用逗号分隔"), + is_registered: bool = Form(True, description="是否直接注册"), + authorization: Optional[str] = Header(None), +): + """ + 批量上传表情包 + + Args: + files: 多个表情包图片文件 + emotion: 共用的情感标签 + is_registered: 是否直接注册 + authorization: Authorization header + + Returns: + 批量上传结果 + """ + try: + verify_auth_token(authorization) + + results = { + "success": True, + "total": len(files), + "uploaded": 0, + "failed": 0, + "details": [], + } + + allowed_types = ["image/jpeg", "image/png", "image/gif", "image/webp"] + os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True) + + for file in files: + try: + # 验证文件类型 + if file.content_type not in allowed_types: + results["failed"] += 1 + results["details"].append({ + "filename": file.filename, + "success": False, + "error": f"不支持的文件类型: {file.content_type}", + }) + continue + + # 读取文件内容 + file_content = await file.read() + + if not file_content: + results["failed"] += 1 + results["details"].append({ + "filename": file.filename, + "success": False, + "error": "文件内容为空", + }) + continue + + # 验证图片 + try: + with Image.open(io.BytesIO(file_content)) as img: + img_format = img.format.lower() if img.format else "png" + except Exception as e: + results["failed"] += 1 + results["details"].append({ + "filename": file.filename, + "success": False, + "error": f"无效的图片: {str(e)}", + }) + continue + + # 计算哈希 + emoji_hash = hashlib.md5(file_content).hexdigest() + + # 检查重复 + if Emoji.get_or_none(Emoji.emoji_hash == emoji_hash): + results["failed"] += 1 + results["details"].append({ + "filename": file.filename, + "success": False, + "error": "已存在相同的表情包", + }) + continue + + # 生成文件名并保存 + timestamp = int(time.time()) + filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}" + full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) + + counter = 1 + while os.path.exists(full_path): + filename = f"emoji_{timestamp}_{emoji_hash[:8]}_{counter}.{img_format}" + full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) + counter += 1 + + with open(full_path, "wb") as f: + f.write(file_content) + + # 处理情感标签 + emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else "" + + # 创建数据库记录 + current_time = time.time() + emoji = Emoji.create( + full_path=full_path, + format=img_format, + emoji_hash=emoji_hash, + description="", # 批量上传暂不设置描述 + emotion=emotion_str, + query_count=0, + is_registered=is_registered, + is_banned=False, + record_time=current_time, + register_time=current_time if is_registered else None, + usage_count=0, + last_used_time=None, + ) + + results["uploaded"] += 1 + results["details"].append({ + "filename": file.filename, + "success": True, + "id": emoji.id, + }) + + except Exception as e: + results["failed"] += 1 + results["details"].append({ + "filename": file.filename, + "success": False, + "error": str(e), + }) + + results["message"] = f"成功上传 {results['uploaded']} 个,失败 {results['failed']} 个" + return results + + except HTTPException: + raise + except Exception as e: + logger.exception(f"批量上传表情包失败: {e}") + raise HTTPException(status_code=500, detail=f"批量上传失败: {str(e)}") from e \ No newline at end of file