feat: 添加表情包上传和批量上传功能,支持文件类型验证和哈希检查

pull/1389/head
墨梓柒 2025-11-29 02:12:49 +08:00
parent 3357ab0d1d
commit 17279c4326
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
1 changed files with 280 additions and 1 deletions

View File

@ -1,6 +1,6 @@
"""表情包管理 API 路由""" """表情包管理 API 路由"""
from fastapi import APIRouter, HTTPException, Header, Query from fastapi import APIRouter, HTTPException, Header, Query, UploadFile, File, Form
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, List from typing import Optional, List
@ -10,6 +10,10 @@ from .token_manager import get_token_manager
import json import json
import time import time
import os import os
import hashlib
import base64
from PIL import Image
import io
logger = get_logger("webui.emoji") logger = get_logger("webui.emoji")
@ -572,3 +576,278 @@ async def batch_delete_emojis(request: BatchDeleteRequest, authorization: Option
except Exception as e: except Exception as e:
logger.exception(f"批量删除表情包失败: {e}") logger.exception(f"批量删除表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"批量删除失败: {str(e)}") from e raise HTTPException(status_code=500, detail=f"批量删除失败: {str(e)}") from e
# 表情包存储目录
EMOJI_REGISTERED_DIR = os.path.join("data", "emoji_registed")
class EmojiUploadResponse(BaseModel):
"""表情包上传响应"""
success: bool
message: str
data: Optional[EmojiResponse] = None
@router.post("/upload", response_model=EmojiUploadResponse)
async def upload_emoji(
file: UploadFile = File(..., description="表情包图片文件"),
description: str = Form("", description="表情包描述"),
emotion: str = Form("", description="情感标签,多个用逗号分隔"),
is_registered: bool = Form(True, description="是否直接注册"),
authorization: Optional[str] = Header(None),
):
"""
上传并注册表情包
Args:
file: 表情包图片文件 (支持 jpg, jpeg, png, gif, webp)
description: 表情包描述
emotion: 情感标签多个用逗号分隔
is_registered: 是否直接注册默认为 True
authorization: Authorization header
Returns:
上传结果和表情包信息
"""
try:
verify_auth_token(authorization)
# 验证文件类型
if not file.content_type:
raise HTTPException(status_code=400, detail="无法识别文件类型")
allowed_types = ["image/jpeg", "image/png", "image/gif", "image/webp"]
if file.content_type not in allowed_types:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file.content_type},支持: {', '.join(allowed_types)}",
)
# 读取文件内容
file_content = await file.read()
if not file_content:
raise HTTPException(status_code=400, detail="文件内容为空")
# 验证图片并获取格式
try:
with Image.open(io.BytesIO(file_content)) as img:
img_format = img.format.lower() if img.format else "png"
# 验证图片可以正常打开
img.verify()
except Exception as e:
raise HTTPException(status_code=400, detail=f"无效的图片文件: {str(e)}") from e
# 重新打开图片verify后需要重新打开
with Image.open(io.BytesIO(file_content)) as img:
img_format = img.format.lower() if img.format else "png"
# 计算文件哈希
emoji_hash = hashlib.md5(file_content).hexdigest()
# 检查是否已存在相同哈希的表情包
existing_emoji = Emoji.get_or_none(Emoji.emoji_hash == emoji_hash)
if existing_emoji:
raise HTTPException(
status_code=409,
detail=f"已存在相同的表情包 (ID: {existing_emoji.id})",
)
# 确保目录存在
os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True)
# 生成文件名
timestamp = int(time.time())
filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
# 如果文件已存在,添加随机后缀
counter = 1
while os.path.exists(full_path):
filename = f"emoji_{timestamp}_{emoji_hash[:8]}_{counter}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
counter += 1
# 保存文件
with open(full_path, "wb") as f:
f.write(file_content)
logger.info(f"表情包文件已保存: {full_path}")
# 处理情感标签
emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else ""
# 创建数据库记录
current_time = time.time()
emoji = Emoji.create(
full_path=full_path,
format=img_format,
emoji_hash=emoji_hash,
description=description,
emotion=emotion_str,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
usage_count=0,
last_used_time=None,
)
logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}")
return EmojiUploadResponse(
success=True,
message="表情包上传成功" + ("并已注册" if is_registered else ""),
data=emoji_to_response(emoji),
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"上传表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}") from e
@router.post("/batch/upload")
async def batch_upload_emoji(
files: List[UploadFile] = File(..., description="多个表情包图片文件"),
emotion: str = Form("", description="情感标签,多个用逗号分隔"),
is_registered: bool = Form(True, description="是否直接注册"),
authorization: Optional[str] = Header(None),
):
"""
批量上传表情包
Args:
files: 多个表情包图片文件
emotion: 共用的情感标签
is_registered: 是否直接注册
authorization: Authorization header
Returns:
批量上传结果
"""
try:
verify_auth_token(authorization)
results = {
"success": True,
"total": len(files),
"uploaded": 0,
"failed": 0,
"details": [],
}
allowed_types = ["image/jpeg", "image/png", "image/gif", "image/webp"]
os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True)
for file in files:
try:
# 验证文件类型
if file.content_type not in allowed_types:
results["failed"] += 1
results["details"].append({
"filename": file.filename,
"success": False,
"error": f"不支持的文件类型: {file.content_type}",
})
continue
# 读取文件内容
file_content = await file.read()
if not file_content:
results["failed"] += 1
results["details"].append({
"filename": file.filename,
"success": False,
"error": "文件内容为空",
})
continue
# 验证图片
try:
with Image.open(io.BytesIO(file_content)) as img:
img_format = img.format.lower() if img.format else "png"
except Exception as e:
results["failed"] += 1
results["details"].append({
"filename": file.filename,
"success": False,
"error": f"无效的图片: {str(e)}",
})
continue
# 计算哈希
emoji_hash = hashlib.md5(file_content).hexdigest()
# 检查重复
if Emoji.get_or_none(Emoji.emoji_hash == emoji_hash):
results["failed"] += 1
results["details"].append({
"filename": file.filename,
"success": False,
"error": "已存在相同的表情包",
})
continue
# 生成文件名并保存
timestamp = int(time.time())
filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
counter = 1
while os.path.exists(full_path):
filename = f"emoji_{timestamp}_{emoji_hash[:8]}_{counter}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
counter += 1
with open(full_path, "wb") as f:
f.write(file_content)
# 处理情感标签
emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else ""
# 创建数据库记录
current_time = time.time()
emoji = Emoji.create(
full_path=full_path,
format=img_format,
emoji_hash=emoji_hash,
description="", # 批量上传暂不设置描述
emotion=emotion_str,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
usage_count=0,
last_used_time=None,
)
results["uploaded"] += 1
results["details"].append({
"filename": file.filename,
"success": True,
"id": emoji.id,
})
except Exception as e:
results["failed"] += 1
results["details"].append({
"filename": file.filename,
"success": False,
"error": str(e),
})
results["message"] = f"成功上传 {results['uploaded']} 个,失败 {results['failed']}"
return results
except HTTPException:
raise
except Exception as e:
logger.exception(f"批量上传表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"批量上传失败: {str(e)}") from e