diff --git a/src/webui/routers/emoji.py b/src/webui/routers/emoji.py index ea09f68e..1a7629b0 100644 --- a/src/webui/routers/emoji.py +++ b/src/webui/routers/emoji.py @@ -1,21 +1,27 @@ """表情包管理 API 路由""" -from fastapi import APIRouter, HTTPException, Header, Query, UploadFile, File, Form, Cookie +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from pathlib import Path +from typing import Annotated, List, Optional + +import asyncio +import hashlib +import io +import os +import threading + +from fastapi import APIRouter, Cookie, File, Form, Header, HTTPException, Query, UploadFile from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel -from typing import Optional, List, Annotated -from src.common.logger import get_logger -from src.common.database.database_model import Emoji -from src.webui.core import get_token_manager, verify_auth_token_from_cookie_or_header -import time -import os -import hashlib from PIL import Image -import io -from pathlib import Path -import threading -import asyncio -from concurrent.futures import ThreadPoolExecutor +from sqlalchemy import func +from sqlmodel import col, delete, select + +from src.common.database.database import get_db_session +from src.common.database.database_model import Images, ImageType +from src.common.logger import get_logger +from src.webui.core import get_token_manager, verify_auth_token_from_cookie_or_header logger = get_logger("webui.emoji") @@ -99,7 +105,7 @@ def _generate_thumbnail(source_path: str, file_hash: str) -> Path: try: with Image.open(source_path) as img: # GIF 处理:提取第一帧 - if hasattr(img, "n_frames") and img.n_frames > 1: + if getattr(img, "n_frames", 1) > 1: img.seek(0) # 确保在第一帧 # 转换为 RGB/RGBA(WebP 支持透明度) @@ -138,9 +144,9 @@ def cleanup_orphaned_thumbnails() -> tuple[int, int]: return 0, 0 # 获取所有表情包的哈希值 - valid_hashes = set() - for emoji in Emoji.select(Emoji.emoji_hash): - valid_hashes.add(emoji.emoji_hash) + with get_db_session() as session: + statement = select(Images.image_hash).where(col(Images.image_type) == ImageType.EMOJI) + valid_hashes = set(session.exec(statement).all()) cleaned = 0 kept = 0 @@ -179,7 +185,6 @@ class EmojiResponse(BaseModel): id: int full_path: str - format: str emoji_hash: str description: str query_count: int @@ -188,7 +193,6 @@ class EmojiResponse(BaseModel): emotion: Optional[str] # 直接返回字符串 record_time: float register_time: Optional[float] - usage_count: int last_used_time: Optional[float] @@ -257,22 +261,19 @@ def verify_auth_token( return verify_auth_token_from_cookie_or_header(maibot_session, authorization) -def emoji_to_response(emoji: Emoji) -> EmojiResponse: - """将 Emoji 模型转换为响应对象""" +def emoji_to_response(image: Images) -> EmojiResponse: return EmojiResponse( - id=emoji.id, - full_path=emoji.full_path, - format=emoji.format, - emoji_hash=emoji.emoji_hash, - description=emoji.description, - query_count=emoji.query_count, - is_registered=emoji.is_registered, - is_banned=emoji.is_banned, - emotion=str(emoji.emotion) if emoji.emotion is not None else None, - record_time=emoji.record_time, - register_time=emoji.register_time, - usage_count=emoji.usage_count, - last_used_time=emoji.last_used_time, + id=image.id if image.id is not None else 0, + full_path=image.full_path, + emoji_hash=image.image_hash, + description=image.description, + query_count=image.query_count, + is_registered=image.is_registered, + is_banned=image.is_banned, + emotion=image.emotion, + record_time=image.record_time.timestamp() if image.record_time else 0.0, + register_time=image.register_time.timestamp() if image.register_time else None, + last_used_time=image.last_used_time.timestamp() if image.last_used_time else None, ) @@ -283,8 +284,7 @@ async def get_emoji_list( search: Optional[str] = Query(None, description="搜索关键词"), is_registered: Optional[bool] = Query(None, description="是否已注册筛选"), is_banned: Optional[bool] = Query(None, description="是否被禁用筛选"), - format: Optional[str] = Query(None, description="格式筛选"), - sort_by: Optional[str] = Query("usage_count", description="排序字段"), + sort_by: Optional[str] = Query("query_count", description="排序字段"), sort_order: Optional[str] = Query("desc", description="排序方向"), maibot_session: Optional[str] = Cookie(None), authorization: Optional[str] = Header(None), @@ -298,8 +298,7 @@ async def get_emoji_list( search: 搜索关键词 (匹配 description, emoji_hash) is_registered: 是否已注册筛选 is_banned: 是否被禁用筛选 - format: 格式筛选 - sort_by: 排序字段 (usage_count, register_time, record_time, last_used_time) + sort_by: 排序字段 (query_count, register_time, record_time, last_used_time) sort_order: 排序方向 (asc, desc) authorization: Authorization header @@ -310,47 +309,58 @@ async def get_emoji_list( verify_auth_token(maibot_session, authorization) # 构建查询 - query = Emoji.select() + statement = select(Images).where(col(Images.image_type) == ImageType.EMOJI) # 搜索过滤 if search: - query = query.where((Emoji.description.contains(search)) | (Emoji.emoji_hash.contains(search))) + statement = statement.where( + (col(Images.description).contains(search)) | (col(Images.image_hash).contains(search)) + ) # 注册状态过滤 if is_registered is not None: - query = query.where(Emoji.is_registered == is_registered) + statement = statement.where(col(Images.is_registered) == is_registered) # 禁用状态过滤 if is_banned is not None: - query = query.where(Emoji.is_banned == is_banned) - - # 格式过滤 - if format: - query = query.where(Emoji.format == format) + statement = statement.where(col(Images.is_banned) == is_banned) # 排序字段映射 sort_field_map = { - "usage_count": Emoji.usage_count, - "register_time": Emoji.register_time, - "record_time": Emoji.record_time, - "last_used_time": Emoji.last_used_time, + "usage_count": col(Images.query_count), + "query_count": col(Images.query_count), + "register_time": col(Images.register_time), + "record_time": col(Images.record_time), + "last_used_time": col(Images.last_used_time), } # 获取排序字段,默认使用 usage_count - sort_field = sort_field_map.get(sort_by, Emoji.usage_count) + sort_key = sort_by or "query_count" + sort_field = sort_field_map.get(sort_key, col(Images.query_count)) # 应用排序 if sort_order == "asc": - query = query.order_by(sort_field.asc()) + statement = statement.order_by(sort_field.asc()) else: - query = query.order_by(sort_field.desc()) - - # 获取总数 - total = query.count() + statement = statement.order_by(sort_field.desc()) # 分页 offset = (page - 1) * page_size - emojis = query.offset(offset).limit(page_size) + statement = statement.offset(offset).limit(page_size) + + with get_db_session() as session: + emojis = session.exec(statement).all() + + count_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI) + if search: + count_statement = count_statement.where( + (col(Images.description).contains(search)) | (col(Images.image_hash).contains(search)) + ) + if is_registered is not None: + count_statement = count_statement.where(col(Images.is_registered) == is_registered) + if is_banned is not None: + count_statement = count_statement.where(col(Images.is_banned) == is_banned) + total = session.exec(count_statement).one() # 转换为响应对象 data = [emoji_to_response(emoji) for emoji in emojis] @@ -381,12 +391,17 @@ async def get_emoji_detail( try: verify_auth_token(maibot_session, authorization) - emoji = Emoji.get_or_none(Emoji.id == emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - return EmojiDetailResponse(success=True, data=emoji_to_response(emoji)) + return EmojiDetailResponse(success=True, data=emoji_to_response(emoji)) except HTTPException: raise @@ -416,34 +431,37 @@ async def update_emoji( try: verify_auth_token(maibot_session, authorization) - emoji = Emoji.get_or_none(Emoji.id == emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - # 只更新提供的字段 - update_data = request.model_dump(exclude_unset=True) + # 只更新提供的字段 + update_data = request.model_dump(exclude_unset=True) - if not update_data: - raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") + if not update_data: + raise HTTPException(status_code=400, detail="未提供任何需要更新的字段") - # emotion 字段直接使用字符串,无需转换 + # 如果注册状态从 False 变为 True,记录注册时间 + if "is_registered" in update_data and update_data["is_registered"] and not emoji.is_registered: + update_data["register_time"] = datetime.now() - # 如果注册状态从 False 变为 True,记录注册时间 - if "is_registered" in update_data and update_data["is_registered"] and not emoji.is_registered: - update_data["register_time"] = time.time() + # 执行更新 + for field, value in update_data.items(): + setattr(emoji, field, value) - # 执行更新 - for field, value in update_data.items(): - setattr(emoji, field, value) + session.add(emoji) - emoji.save() + logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}") - logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}") - - return EmojiUpdateResponse( - success=True, message=f"成功更新 {len(update_data)} 个字段", data=emoji_to_response(emoji) - ) + return EmojiUpdateResponse( + success=True, message=f"成功更新 {len(update_data)} 个字段", data=emoji_to_response(emoji) + ) except HTTPException: raise @@ -469,20 +487,22 @@ async def delete_emoji( try: verify_auth_token(maibot_session, authorization) - emoji = Emoji.get_or_none(Emoji.id == emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - # 记录删除信息 - emoji_hash = emoji.emoji_hash + emoji_hash = emoji.image_hash + session.delete(emoji) - # 执行删除 - emoji.delete_instance() + logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}") - logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}") - - return EmojiDeleteResponse(success=True, message=f"成功删除表情包: {emoji_hash}") + return EmojiDeleteResponse(success=True, message=f"成功删除表情包: {emoji_hash}") except HTTPException: raise @@ -505,27 +525,51 @@ async def get_emoji_stats(maibot_session: Optional[str] = Cookie(None), authoriz try: verify_auth_token(maibot_session, authorization) - total = Emoji.select().count() - registered = Emoji.select().where(Emoji.is_registered).count() - banned = Emoji.select().where(Emoji.is_banned).count() + with get_db_session() as session: + total_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI) + registered_statement = ( + select(func.count()) + .select_from(Images) + .where( + col(Images.image_type) == ImageType.EMOJI, + col(Images.is_registered) == True, + ) + ) + banned_statement = ( + select(func.count()) + .select_from(Images) + .where( + col(Images.image_type) == ImageType.EMOJI, + col(Images.is_banned) == True, + ) + ) - # 按格式统计 - formats = {} - for emoji in Emoji.select(Emoji.format): - fmt = emoji.format - formats[fmt] = formats.get(fmt, 0) + 1 + total = session.exec(total_statement).one() + registered = session.exec(registered_statement).one() + banned = session.exec(banned_statement).one() - # 获取最常用的表情包(前10) - top_used = Emoji.select().order_by(Emoji.usage_count.desc()).limit(10) - top_used_list = [ - { - "id": emoji.id, - "emoji_hash": emoji.emoji_hash, - "description": emoji.description, - "usage_count": emoji.usage_count, - } - for emoji in top_used - ] + formats: dict[str, int] = {} + format_statement = select(Images.full_path).where(col(Images.image_type) == ImageType.EMOJI) + for full_path in session.exec(format_statement).all(): + suffix = Path(full_path).suffix.lower().lstrip(".") + fmt = suffix or "unknown" + formats[fmt] = formats.get(fmt, 0) + 1 + + top_used_statement = ( + select(Images) + .where(col(Images.image_type) == ImageType.EMOJI) + .order_by(col(Images.query_count).desc()) + .limit(10) + ) + top_used_list = [ + { + "id": emoji.id, + "emoji_hash": emoji.image_hash, + "description": emoji.description, + "usage_count": emoji.query_count, + } + for emoji in session.exec(top_used_statement).all() + ] return { "success": True, @@ -563,23 +607,27 @@ async def register_emoji( try: verify_auth_token(maibot_session, authorization) - emoji = Emoji.get_or_none(Emoji.id == emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - if emoji.is_registered: - raise HTTPException(status_code=400, detail="该表情包已经注册") + if emoji.is_registered: + raise HTTPException(status_code=400, detail="该表情包已经注册") - # 注册表情包(如果已封禁,自动解除封禁) - emoji.is_registered = True - emoji.is_banned = False # 注册时自动解除封禁 - emoji.register_time = time.time() - emoji.save() + emoji.is_registered = True + emoji.is_banned = False + emoji.register_time = datetime.now() + session.add(emoji) - logger.info(f"表情包已注册: ID={emoji_id}") + logger.info(f"表情包已注册: ID={emoji_id}") - return EmojiUpdateResponse(success=True, message="表情包注册成功", data=emoji_to_response(emoji)) + return EmojiUpdateResponse(success=True, message="表情包注册成功", data=emoji_to_response(emoji)) except HTTPException: raise @@ -605,19 +653,23 @@ async def ban_emoji( try: verify_auth_token(maibot_session, authorization) - emoji = Emoji.get_or_none(Emoji.id == emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - # 禁用表情包(同时取消注册) - emoji.is_banned = True - emoji.is_registered = False - emoji.save() + emoji.is_banned = True + emoji.is_registered = False + session.add(emoji) - logger.info(f"表情包已禁用: ID={emoji_id}") + logger.info(f"表情包已禁用: ID={emoji_id}") - return EmojiUpdateResponse(success=True, message="表情包禁用成功", data=emoji_to_response(emoji)) + return EmojiUpdateResponse(success=True, message="表情包禁用成功", data=emoji_to_response(emoji)) except HTTPException: raise @@ -672,61 +724,58 @@ async def get_emoji_thumbnail( if not is_valid: raise HTTPException(status_code=401, detail="Token 无效或已过期") - emoji = Emoji.get_or_none(Emoji.id == emoji_id) - - if not emoji: - raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - - # 检查文件是否存在 - if not os.path.exists(emoji.full_path): - raise HTTPException(status_code=404, detail="表情包文件不存在") - - # 如果请求原图,直接返回原文件 - if original: - mime_types = { - "png": "image/png", - "jpg": "image/jpeg", - "jpeg": "image/jpeg", - "gif": "image/gif", - "webp": "image/webp", - "bmp": "image/bmp", - } - media_type = mime_types.get(emoji.format.lower(), "application/octet-stream") - return FileResponse( - path=emoji.full_path, media_type=media_type, filename=f"{emoji.emoji_hash}.{emoji.format}" + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, ) + emoji = session.exec(statement).first() - # 尝试获取或生成缩略图 - cache_path = _get_thumbnail_cache_path(emoji.emoji_hash) + if not emoji: + raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包") - # 检查缓存是否存在 - if cache_path.exists(): - # 缓存命中,直接返回 - return FileResponse( - path=str(cache_path), media_type="image/webp", filename=f"{emoji.emoji_hash}_thumb.webp" + if not os.path.exists(emoji.full_path): + raise HTTPException(status_code=404, detail="表情包文件不存在") + + if original: + mime_types = { + "png": "image/png", + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "gif": "image/gif", + "webp": "image/webp", + "bmp": "image/bmp", + } + suffix = Path(emoji.full_path).suffix.lower().lstrip(".") + media_type = mime_types.get(suffix, "application/octet-stream") + return FileResponse( + path=emoji.full_path, media_type=media_type, filename=f"{emoji.image_hash}.{suffix}" + ) + + cache_path = _get_thumbnail_cache_path(emoji.image_hash) + + if cache_path.exists(): + return FileResponse( + path=str(cache_path), media_type="image/webp", filename=f"{emoji.image_hash}_thumb.webp" + ) + + with _generating_lock: + if emoji.image_hash not in _generating_thumbnails: + _generating_thumbnails.add(emoji.image_hash) + _thumbnail_executor.submit(_background_generate_thumbnail, emoji.full_path, emoji.image_hash) + + return JSONResponse( + status_code=202, + content={ + "status": "generating", + "message": "缩略图正在生成中,请稍后重试", + "emoji_id": emoji_id, + }, + headers={ + "Retry-After": "1", + }, ) - # 缓存未命中,触发后台生成并返回 202 - with _generating_lock: - if emoji.emoji_hash not in _generating_thumbnails: - # 标记为正在生成 - _generating_thumbnails.add(emoji.emoji_hash) - # 提交到线程池后台生成 - _thumbnail_executor.submit(_background_generate_thumbnail, emoji.full_path, emoji.emoji_hash) - - # 返回 202 Accepted,告诉前端缩略图正在生成中 - return JSONResponse( - status_code=202, - content={ - "status": "generating", - "message": "缩略图正在生成中,请稍后重试", - "emoji_id": emoji_id, - }, - headers={ - "Retry-After": "1", # 建议 1 秒后重试 - }, - ) - except HTTPException: raise except Exception as e: @@ -762,14 +811,19 @@ async def batch_delete_emojis( for emoji_id in request.emoji_ids: try: - emoji = Emoji.get_or_none(Emoji.id == emoji_id) - if emoji: - emoji.delete_instance() - deleted_count += 1 - logger.info(f"批量删除表情包: {emoji_id}") - else: - failed_count += 1 - failed_ids.append(emoji_id) + with get_db_session() as session: + statement = select(Images).where( + col(Images.id) == emoji_id, + col(Images.image_type) == ImageType.EMOJI, + ) + emoji = session.exec(statement).first() + if emoji: + session.delete(emoji) + deleted_count += 1 + logger.info(f"批量删除表情包: {emoji_id}") + else: + failed_count += 1 + failed_ids.append(emoji_id) except Exception as e: logger.error(f"删除表情包 {emoji_id} 失败: {e}") failed_count += 1 @@ -864,19 +918,23 @@ async def upload_emoji( # 计算文件哈希 emoji_hash = hashlib.md5(file_content).hexdigest() - # 检查是否已存在相同哈希的表情包 - existing_emoji = Emoji.get_or_none(Emoji.emoji_hash == emoji_hash) - if existing_emoji: - raise HTTPException( - status_code=409, - detail=f"已存在相同的表情包 (ID: {existing_emoji.id})", + with get_db_session() as session: + existing_statement = select(Images).where( + col(Images.image_hash) == emoji_hash, + col(Images.image_type) == ImageType.EMOJI, ) + existing_emoji = session.exec(existing_statement).first() + if existing_emoji: + raise HTTPException( + status_code=409, + detail=f"已存在相同的表情包 (ID: {existing_emoji.id})", + ) # 确保目录存在 os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True) # 生成文件名 - timestamp = int(time.time()) + timestamp = int(datetime.now().timestamp()) filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}" full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) @@ -896,30 +954,31 @@ async def upload_emoji( # 处理情感标签 emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else "" - # 创建数据库记录 - current_time = time.time() - emoji = Emoji.create( - full_path=full_path, - format=img_format, - emoji_hash=emoji_hash, - description=description, - emotion=emotion_str, - query_count=0, - is_registered=is_registered, - is_banned=False, - record_time=current_time, - register_time=current_time if is_registered else None, - usage_count=0, - last_used_time=None, - ) + current_time = datetime.now() + with get_db_session() as session: + emoji = Images( + image_type=ImageType.EMOJI, + full_path=full_path, + image_hash=emoji_hash, + description=description, + emotion=emotion_str or None, + query_count=0, + is_registered=is_registered, + is_banned=False, + record_time=current_time, + register_time=current_time if is_registered else None, + last_used_time=None, + ) + session.add(emoji) + session.flush() - logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}") + logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}") - return EmojiUploadResponse( - success=True, - message="表情包上传成功" + ("并已注册" if is_registered else ""), - data=emoji_to_response(emoji), - ) + return EmojiUploadResponse( + success=True, + message="表情包上传成功" + ("并已注册" if is_registered else ""), + data=emoji_to_response(emoji), + ) except HTTPException: raise @@ -1008,20 +1067,24 @@ async def batch_upload_emoji( # 计算哈希 emoji_hash = hashlib.md5(file_content).hexdigest() - # 检查重复 - if Emoji.get_or_none(Emoji.emoji_hash == emoji_hash): - results["failed"] += 1 - results["details"].append( - { - "filename": file.filename, - "success": False, - "error": "已存在相同的表情包", - } + with get_db_session() as session: + existing_statement = select(Images).where( + col(Images.image_hash) == emoji_hash, + col(Images.image_type) == ImageType.EMOJI, ) - continue + if session.exec(existing_statement).first(): + results["failed"] += 1 + results["details"].append( + { + "filename": file.filename, + "success": False, + "error": "已存在相同的表情包", + } + ) + continue # 生成文件名并保存 - timestamp = int(time.time()) + timestamp = int(datetime.now().timestamp()) filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}" full_path = os.path.join(EMOJI_REGISTERED_DIR, filename) @@ -1037,31 +1100,32 @@ async def batch_upload_emoji( # 处理情感标签 emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else "" - # 创建数据库记录 - current_time = time.time() - emoji = Emoji.create( - full_path=full_path, - format=img_format, - emoji_hash=emoji_hash, - description="", # 批量上传暂不设置描述 - emotion=emotion_str, - query_count=0, - is_registered=is_registered, - is_banned=False, - record_time=current_time, - register_time=current_time if is_registered else None, - usage_count=0, - last_used_time=None, - ) + current_time = datetime.now() + with get_db_session() as session: + emoji = Images( + image_type=ImageType.EMOJI, + full_path=full_path, + image_hash=emoji_hash, + description="", + emotion=emotion_str or None, + query_count=0, + is_registered=is_registered, + is_banned=False, + record_time=current_time, + register_time=current_time if is_registered else None, + last_used_time=None, + ) + session.add(emoji) + session.flush() - results["uploaded"] += 1 - results["details"].append( - { - "filename": file.filename, - "success": True, - "id": emoji.id, - } - ) + results["uploaded"] += 1 + results["details"].append( + { + "filename": file.filename, + "success": True, + "id": emoji.id, + } + ) except Exception as e: results["failed"] += 1 @@ -1138,8 +1202,9 @@ async def get_thumbnail_cache_stats( total_size = sum(f.stat().st_size for f in cache_files) total_size_mb = round(total_size / (1024 * 1024), 2) - # 统计表情包总数 - emoji_count = Emoji.select().count() + with get_db_session() as session: + count_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI) + emoji_count = session.exec(count_statement).one() # 计算覆盖率 coverage_percent = round((total_count / emoji_count * 100) if emoji_count > 0 else 0, 1) @@ -1213,12 +1278,17 @@ async def preheat_thumbnail_cache( _ensure_thumbnail_cache_dir() # 获取使用次数最高的表情包(未缓存的优先) - emojis = ( - Emoji.select() - .where(Emoji.is_banned == False) # noqa: E712 Peewee ORM requires == for boolean comparison - .order_by(Emoji.usage_count.desc()) - .limit(limit * 2) # 多查一些,因为有些可能已缓存 - ) + with get_db_session() as session: + statement = ( + select(Images) + .where( + col(Images.image_type) == ImageType.EMOJI, + col(Images.is_banned) == False, + ) + .order_by(col(Images.query_count).desc()) + .limit(limit * 2) + ) + emojis = session.exec(statement).all() generated = 0 skipped = 0 @@ -1228,25 +1298,22 @@ async def preheat_thumbnail_cache( if generated >= limit: break - cache_path = _get_thumbnail_cache_path(emoji.emoji_hash) + cache_path = _get_thumbnail_cache_path(emoji.image_hash) - # 已缓存,跳过 if cache_path.exists(): skipped += 1 continue - # 原文件不存在,跳过 if not os.path.exists(emoji.full_path): failed += 1 continue try: - # 使用线程池异步生成缩略图,避免阻塞事件循环 loop = asyncio.get_event_loop() - await loop.run_in_executor(_thumbnail_executor, _generate_thumbnail, emoji.full_path, emoji.emoji_hash) + await loop.run_in_executor(_thumbnail_executor, _generate_thumbnail, emoji.full_path, emoji.image_hash) generated += 1 except Exception as e: - logger.warning(f"预热缩略图失败 {emoji.emoji_hash}: {e}") + logger.warning(f"预热缩略图失败 {emoji.image_hash}: {e}") failed += 1 return ThumbnailPreheatResponse( diff --git a/src/webui/schemas/emoji.py b/src/webui/schemas/emoji.py index 571eccd6..70787975 100644 --- a/src/webui/schemas/emoji.py +++ b/src/webui/schemas/emoji.py @@ -7,7 +7,6 @@ class EmojiResponse(BaseModel): id: int full_path: str - format: str emoji_hash: str description: str query_count: int @@ -16,7 +15,6 @@ class EmojiResponse(BaseModel): emotion: Optional[str] record_time: float register_time: Optional[float] - usage_count: int last_used_time: Optional[float]