refactor(webui): migrate emoji routes from Peewee to SQLModel

- 完全迁移到 SQLModel,所有 DB 操作使用 get_db_session()
- 字段映射:image_hash → emoji_hash
- datetime 时间戳转换
- 移除 format/usage_count 字段
pull/1496/head
DrSmoothl 2026-02-17 19:58:14 +08:00
parent 0ea18a4edc
commit 7da0811b5c
No known key found for this signature in database
2 changed files with 334 additions and 269 deletions

View File

@ -1,21 +1,27 @@
"""表情包管理 API 路由"""
from fastapi import APIRouter, HTTPException, Header, Query, UploadFile, File, Form, Cookie
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Annotated, List, Optional
import asyncio
import hashlib
import io
import os
import threading
from fastapi import APIRouter, Cookie, File, Form, Header, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from typing import Optional, List, Annotated
from src.common.logger import get_logger
from src.common.database.database_model import Emoji
from src.webui.core import get_token_manager, verify_auth_token_from_cookie_or_header
import time
import os
import hashlib
from PIL import Image
import io
from pathlib import Path
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import func
from sqlmodel import col, delete, select
from src.common.database.database import get_db_session
from src.common.database.database_model import Images, ImageType
from src.common.logger import get_logger
from src.webui.core import get_token_manager, verify_auth_token_from_cookie_or_header
logger = get_logger("webui.emoji")
@ -99,7 +105,7 @@ def _generate_thumbnail(source_path: str, file_hash: str) -> Path:
try:
with Image.open(source_path) as img:
# GIF 处理:提取第一帧
if hasattr(img, "n_frames") and img.n_frames > 1:
if getattr(img, "n_frames", 1) > 1:
img.seek(0) # 确保在第一帧
# 转换为 RGB/RGBAWebP 支持透明度)
@ -138,9 +144,9 @@ def cleanup_orphaned_thumbnails() -> tuple[int, int]:
return 0, 0
# 获取所有表情包的哈希值
valid_hashes = set()
for emoji in Emoji.select(Emoji.emoji_hash):
valid_hashes.add(emoji.emoji_hash)
with get_db_session() as session:
statement = select(Images.image_hash).where(col(Images.image_type) == ImageType.EMOJI)
valid_hashes = set(session.exec(statement).all())
cleaned = 0
kept = 0
@ -179,7 +185,6 @@ class EmojiResponse(BaseModel):
id: int
full_path: str
format: str
emoji_hash: str
description: str
query_count: int
@ -188,7 +193,6 @@ class EmojiResponse(BaseModel):
emotion: Optional[str] # 直接返回字符串
record_time: float
register_time: Optional[float]
usage_count: int
last_used_time: Optional[float]
@ -257,22 +261,19 @@ def verify_auth_token(
return verify_auth_token_from_cookie_or_header(maibot_session, authorization)
def emoji_to_response(emoji: Emoji) -> EmojiResponse:
"""将 Emoji 模型转换为响应对象"""
def emoji_to_response(image: Images) -> EmojiResponse:
return EmojiResponse(
id=emoji.id,
full_path=emoji.full_path,
format=emoji.format,
emoji_hash=emoji.emoji_hash,
description=emoji.description,
query_count=emoji.query_count,
is_registered=emoji.is_registered,
is_banned=emoji.is_banned,
emotion=str(emoji.emotion) if emoji.emotion is not None else None,
record_time=emoji.record_time,
register_time=emoji.register_time,
usage_count=emoji.usage_count,
last_used_time=emoji.last_used_time,
id=image.id if image.id is not None else 0,
full_path=image.full_path,
emoji_hash=image.image_hash,
description=image.description,
query_count=image.query_count,
is_registered=image.is_registered,
is_banned=image.is_banned,
emotion=image.emotion,
record_time=image.record_time.timestamp() if image.record_time else 0.0,
register_time=image.register_time.timestamp() if image.register_time else None,
last_used_time=image.last_used_time.timestamp() if image.last_used_time else None,
)
@ -283,8 +284,7 @@ async def get_emoji_list(
search: Optional[str] = Query(None, description="搜索关键词"),
is_registered: Optional[bool] = Query(None, description="是否已注册筛选"),
is_banned: Optional[bool] = Query(None, description="是否被禁用筛选"),
format: Optional[str] = Query(None, description="格式筛选"),
sort_by: Optional[str] = Query("usage_count", description="排序字段"),
sort_by: Optional[str] = Query("query_count", description="排序字段"),
sort_order: Optional[str] = Query("desc", description="排序方向"),
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
@ -298,8 +298,7 @@ async def get_emoji_list(
search: 搜索关键词 (匹配 description, emoji_hash)
is_registered: 是否已注册筛选
is_banned: 是否被禁用筛选
format: 格式筛选
sort_by: 排序字段 (usage_count, register_time, record_time, last_used_time)
sort_by: 排序字段 (query_count, register_time, record_time, last_used_time)
sort_order: 排序方向 (asc, desc)
authorization: Authorization header
@ -310,47 +309,58 @@ async def get_emoji_list(
verify_auth_token(maibot_session, authorization)
# 构建查询
query = Emoji.select()
statement = select(Images).where(col(Images.image_type) == ImageType.EMOJI)
# 搜索过滤
if search:
query = query.where((Emoji.description.contains(search)) | (Emoji.emoji_hash.contains(search)))
statement = statement.where(
(col(Images.description).contains(search)) | (col(Images.image_hash).contains(search))
)
# 注册状态过滤
if is_registered is not None:
query = query.where(Emoji.is_registered == is_registered)
statement = statement.where(col(Images.is_registered) == is_registered)
# 禁用状态过滤
if is_banned is not None:
query = query.where(Emoji.is_banned == is_banned)
# 格式过滤
if format:
query = query.where(Emoji.format == format)
statement = statement.where(col(Images.is_banned) == is_banned)
# 排序字段映射
sort_field_map = {
"usage_count": Emoji.usage_count,
"register_time": Emoji.register_time,
"record_time": Emoji.record_time,
"last_used_time": Emoji.last_used_time,
"usage_count": col(Images.query_count),
"query_count": col(Images.query_count),
"register_time": col(Images.register_time),
"record_time": col(Images.record_time),
"last_used_time": col(Images.last_used_time),
}
# 获取排序字段,默认使用 usage_count
sort_field = sort_field_map.get(sort_by, Emoji.usage_count)
sort_key = sort_by or "query_count"
sort_field = sort_field_map.get(sort_key, col(Images.query_count))
# 应用排序
if sort_order == "asc":
query = query.order_by(sort_field.asc())
statement = statement.order_by(sort_field.asc())
else:
query = query.order_by(sort_field.desc())
# 获取总数
total = query.count()
statement = statement.order_by(sort_field.desc())
# 分页
offset = (page - 1) * page_size
emojis = query.offset(offset).limit(page_size)
statement = statement.offset(offset).limit(page_size)
with get_db_session() as session:
emojis = session.exec(statement).all()
count_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI)
if search:
count_statement = count_statement.where(
(col(Images.description).contains(search)) | (col(Images.image_hash).contains(search))
)
if is_registered is not None:
count_statement = count_statement.where(col(Images.is_registered) == is_registered)
if is_banned is not None:
count_statement = count_statement.where(col(Images.is_banned) == is_banned)
total = session.exec(count_statement).one()
# 转换为响应对象
data = [emoji_to_response(emoji) for emoji in emojis]
@ -381,12 +391,17 @@ async def get_emoji_detail(
try:
verify_auth_token(maibot_session, authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
return EmojiDetailResponse(success=True, data=emoji_to_response(emoji))
return EmojiDetailResponse(success=True, data=emoji_to_response(emoji))
except HTTPException:
raise
@ -416,34 +431,37 @@ async def update_emoji(
try:
verify_auth_token(maibot_session, authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(status_code=400, detail="未提供任何需要更新的字段")
if not update_data:
raise HTTPException(status_code=400, detail="未提供任何需要更新的字段")
# emotion 字段直接使用字符串,无需转换
# 如果注册状态从 False 变为 True记录注册时间
if "is_registered" in update_data and update_data["is_registered"] and not emoji.is_registered:
update_data["register_time"] = datetime.now()
# 如果注册状态从 False 变为 True记录注册时间
if "is_registered" in update_data and update_data["is_registered"] and not emoji.is_registered:
update_data["register_time"] = time.time()
# 执行更新
for field, value in update_data.items():
setattr(emoji, field, value)
# 执行更新
for field, value in update_data.items():
setattr(emoji, field, value)
session.add(emoji)
emoji.save()
logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}")
logger.info(f"表情包已更新: ID={emoji_id}, 字段: {list(update_data.keys())}")
return EmojiUpdateResponse(
success=True, message=f"成功更新 {len(update_data)} 个字段", data=emoji_to_response(emoji)
)
return EmojiUpdateResponse(
success=True, message=f"成功更新 {len(update_data)} 个字段", data=emoji_to_response(emoji)
)
except HTTPException:
raise
@ -469,20 +487,22 @@ async def delete_emoji(
try:
verify_auth_token(maibot_session, authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 记录删除信息
emoji_hash = emoji.emoji_hash
emoji_hash = emoji.image_hash
session.delete(emoji)
# 执行删除
emoji.delete_instance()
logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}")
logger.info(f"表情包已删除: ID={emoji_id}, hash={emoji_hash}")
return EmojiDeleteResponse(success=True, message=f"成功删除表情包: {emoji_hash}")
return EmojiDeleteResponse(success=True, message=f"成功删除表情包: {emoji_hash}")
except HTTPException:
raise
@ -505,27 +525,51 @@ async def get_emoji_stats(maibot_session: Optional[str] = Cookie(None), authoriz
try:
verify_auth_token(maibot_session, authorization)
total = Emoji.select().count()
registered = Emoji.select().where(Emoji.is_registered).count()
banned = Emoji.select().where(Emoji.is_banned).count()
with get_db_session() as session:
total_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI)
registered_statement = (
select(func.count())
.select_from(Images)
.where(
col(Images.image_type) == ImageType.EMOJI,
col(Images.is_registered) == True,
)
)
banned_statement = (
select(func.count())
.select_from(Images)
.where(
col(Images.image_type) == ImageType.EMOJI,
col(Images.is_banned) == True,
)
)
# 按格式统计
formats = {}
for emoji in Emoji.select(Emoji.format):
fmt = emoji.format
formats[fmt] = formats.get(fmt, 0) + 1
total = session.exec(total_statement).one()
registered = session.exec(registered_statement).one()
banned = session.exec(banned_statement).one()
# 获取最常用的表情包前10
top_used = Emoji.select().order_by(Emoji.usage_count.desc()).limit(10)
top_used_list = [
{
"id": emoji.id,
"emoji_hash": emoji.emoji_hash,
"description": emoji.description,
"usage_count": emoji.usage_count,
}
for emoji in top_used
]
formats: dict[str, int] = {}
format_statement = select(Images.full_path).where(col(Images.image_type) == ImageType.EMOJI)
for full_path in session.exec(format_statement).all():
suffix = Path(full_path).suffix.lower().lstrip(".")
fmt = suffix or "unknown"
formats[fmt] = formats.get(fmt, 0) + 1
top_used_statement = (
select(Images)
.where(col(Images.image_type) == ImageType.EMOJI)
.order_by(col(Images.query_count).desc())
.limit(10)
)
top_used_list = [
{
"id": emoji.id,
"emoji_hash": emoji.image_hash,
"description": emoji.description,
"usage_count": emoji.query_count,
}
for emoji in session.exec(top_used_statement).all()
]
return {
"success": True,
@ -563,23 +607,27 @@ async def register_emoji(
try:
verify_auth_token(maibot_session, authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if emoji.is_registered:
raise HTTPException(status_code=400, detail="该表情包已经注册")
if emoji.is_registered:
raise HTTPException(status_code=400, detail="该表情包已经注册")
# 注册表情包(如果已封禁,自动解除封禁)
emoji.is_registered = True
emoji.is_banned = False # 注册时自动解除封禁
emoji.register_time = time.time()
emoji.save()
emoji.is_registered = True
emoji.is_banned = False
emoji.register_time = datetime.now()
session.add(emoji)
logger.info(f"表情包已注册: ID={emoji_id}")
logger.info(f"表情包已注册: ID={emoji_id}")
return EmojiUpdateResponse(success=True, message="表情包注册成功", data=emoji_to_response(emoji))
return EmojiUpdateResponse(success=True, message="表情包注册成功", data=emoji_to_response(emoji))
except HTTPException:
raise
@ -605,19 +653,23 @@ async def ban_emoji(
try:
verify_auth_token(maibot_session, authorization)
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 禁用表情包(同时取消注册)
emoji.is_banned = True
emoji.is_registered = False
emoji.save()
emoji.is_banned = True
emoji.is_registered = False
session.add(emoji)
logger.info(f"表情包已禁用: ID={emoji_id}")
logger.info(f"表情包已禁用: ID={emoji_id}")
return EmojiUpdateResponse(success=True, message="表情包禁用成功", data=emoji_to_response(emoji))
return EmojiUpdateResponse(success=True, message="表情包禁用成功", data=emoji_to_response(emoji))
except HTTPException:
raise
@ -672,61 +724,58 @@ async def get_emoji_thumbnail(
if not is_valid:
raise HTTPException(status_code=401, detail="Token 无效或已过期")
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 检查文件是否存在
if not os.path.exists(emoji.full_path):
raise HTTPException(status_code=404, detail="表情包文件不存在")
# 如果请求原图,直接返回原文件
if original:
mime_types = {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"webp": "image/webp",
"bmp": "image/bmp",
}
media_type = mime_types.get(emoji.format.lower(), "application/octet-stream")
return FileResponse(
path=emoji.full_path, media_type=media_type, filename=f"{emoji.emoji_hash}.{emoji.format}"
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
# 尝试获取或生成缩略图
cache_path = _get_thumbnail_cache_path(emoji.emoji_hash)
if not emoji:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {emoji_id} 的表情包")
# 检查缓存是否存在
if cache_path.exists():
# 缓存命中,直接返回
return FileResponse(
path=str(cache_path), media_type="image/webp", filename=f"{emoji.emoji_hash}_thumb.webp"
if not os.path.exists(emoji.full_path):
raise HTTPException(status_code=404, detail="表情包文件不存在")
if original:
mime_types = {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"webp": "image/webp",
"bmp": "image/bmp",
}
suffix = Path(emoji.full_path).suffix.lower().lstrip(".")
media_type = mime_types.get(suffix, "application/octet-stream")
return FileResponse(
path=emoji.full_path, media_type=media_type, filename=f"{emoji.image_hash}.{suffix}"
)
cache_path = _get_thumbnail_cache_path(emoji.image_hash)
if cache_path.exists():
return FileResponse(
path=str(cache_path), media_type="image/webp", filename=f"{emoji.image_hash}_thumb.webp"
)
with _generating_lock:
if emoji.image_hash not in _generating_thumbnails:
_generating_thumbnails.add(emoji.image_hash)
_thumbnail_executor.submit(_background_generate_thumbnail, emoji.full_path, emoji.image_hash)
return JSONResponse(
status_code=202,
content={
"status": "generating",
"message": "缩略图正在生成中,请稍后重试",
"emoji_id": emoji_id,
},
headers={
"Retry-After": "1",
},
)
# 缓存未命中,触发后台生成并返回 202
with _generating_lock:
if emoji.emoji_hash not in _generating_thumbnails:
# 标记为正在生成
_generating_thumbnails.add(emoji.emoji_hash)
# 提交到线程池后台生成
_thumbnail_executor.submit(_background_generate_thumbnail, emoji.full_path, emoji.emoji_hash)
# 返回 202 Accepted告诉前端缩略图正在生成中
return JSONResponse(
status_code=202,
content={
"status": "generating",
"message": "缩略图正在生成中,请稍后重试",
"emoji_id": emoji_id,
},
headers={
"Retry-After": "1", # 建议 1 秒后重试
},
)
except HTTPException:
raise
except Exception as e:
@ -762,14 +811,19 @@ async def batch_delete_emojis(
for emoji_id in request.emoji_ids:
try:
emoji = Emoji.get_or_none(Emoji.id == emoji_id)
if emoji:
emoji.delete_instance()
deleted_count += 1
logger.info(f"批量删除表情包: {emoji_id}")
else:
failed_count += 1
failed_ids.append(emoji_id)
with get_db_session() as session:
statement = select(Images).where(
col(Images.id) == emoji_id,
col(Images.image_type) == ImageType.EMOJI,
)
emoji = session.exec(statement).first()
if emoji:
session.delete(emoji)
deleted_count += 1
logger.info(f"批量删除表情包: {emoji_id}")
else:
failed_count += 1
failed_ids.append(emoji_id)
except Exception as e:
logger.error(f"删除表情包 {emoji_id} 失败: {e}")
failed_count += 1
@ -864,19 +918,23 @@ async def upload_emoji(
# 计算文件哈希
emoji_hash = hashlib.md5(file_content).hexdigest()
# 检查是否已存在相同哈希的表情包
existing_emoji = Emoji.get_or_none(Emoji.emoji_hash == emoji_hash)
if existing_emoji:
raise HTTPException(
status_code=409,
detail=f"已存在相同的表情包 (ID: {existing_emoji.id})",
with get_db_session() as session:
existing_statement = select(Images).where(
col(Images.image_hash) == emoji_hash,
col(Images.image_type) == ImageType.EMOJI,
)
existing_emoji = session.exec(existing_statement).first()
if existing_emoji:
raise HTTPException(
status_code=409,
detail=f"已存在相同的表情包 (ID: {existing_emoji.id})",
)
# 确保目录存在
os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True)
# 生成文件名
timestamp = int(time.time())
timestamp = int(datetime.now().timestamp())
filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
@ -896,30 +954,31 @@ async def upload_emoji(
# 处理情感标签
emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else ""
# 创建数据库记录
current_time = time.time()
emoji = Emoji.create(
full_path=full_path,
format=img_format,
emoji_hash=emoji_hash,
description=description,
emotion=emotion_str,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
usage_count=0,
last_used_time=None,
)
current_time = datetime.now()
with get_db_session() as session:
emoji = Images(
image_type=ImageType.EMOJI,
full_path=full_path,
image_hash=emoji_hash,
description=description,
emotion=emotion_str or None,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
last_used_time=None,
)
session.add(emoji)
session.flush()
logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}")
logger.info(f"表情包已上传并注册: ID={emoji.id}, hash={emoji_hash}")
return EmojiUploadResponse(
success=True,
message="表情包上传成功" + ("并已注册" if is_registered else ""),
data=emoji_to_response(emoji),
)
return EmojiUploadResponse(
success=True,
message="表情包上传成功" + ("并已注册" if is_registered else ""),
data=emoji_to_response(emoji),
)
except HTTPException:
raise
@ -1008,20 +1067,24 @@ async def batch_upload_emoji(
# 计算哈希
emoji_hash = hashlib.md5(file_content).hexdigest()
# 检查重复
if Emoji.get_or_none(Emoji.emoji_hash == emoji_hash):
results["failed"] += 1
results["details"].append(
{
"filename": file.filename,
"success": False,
"error": "已存在相同的表情包",
}
with get_db_session() as session:
existing_statement = select(Images).where(
col(Images.image_hash) == emoji_hash,
col(Images.image_type) == ImageType.EMOJI,
)
continue
if session.exec(existing_statement).first():
results["failed"] += 1
results["details"].append(
{
"filename": file.filename,
"success": False,
"error": "已存在相同的表情包",
}
)
continue
# 生成文件名并保存
timestamp = int(time.time())
timestamp = int(datetime.now().timestamp())
filename = f"emoji_{timestamp}_{emoji_hash[:8]}.{img_format}"
full_path = os.path.join(EMOJI_REGISTERED_DIR, filename)
@ -1037,31 +1100,32 @@ async def batch_upload_emoji(
# 处理情感标签
emotion_str = ",".join(e.strip() for e in emotion.split(",") if e.strip()) if emotion else ""
# 创建数据库记录
current_time = time.time()
emoji = Emoji.create(
full_path=full_path,
format=img_format,
emoji_hash=emoji_hash,
description="", # 批量上传暂不设置描述
emotion=emotion_str,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
usage_count=0,
last_used_time=None,
)
current_time = datetime.now()
with get_db_session() as session:
emoji = Images(
image_type=ImageType.EMOJI,
full_path=full_path,
image_hash=emoji_hash,
description="",
emotion=emotion_str or None,
query_count=0,
is_registered=is_registered,
is_banned=False,
record_time=current_time,
register_time=current_time if is_registered else None,
last_used_time=None,
)
session.add(emoji)
session.flush()
results["uploaded"] += 1
results["details"].append(
{
"filename": file.filename,
"success": True,
"id": emoji.id,
}
)
results["uploaded"] += 1
results["details"].append(
{
"filename": file.filename,
"success": True,
"id": emoji.id,
}
)
except Exception as e:
results["failed"] += 1
@ -1138,8 +1202,9 @@ async def get_thumbnail_cache_stats(
total_size = sum(f.stat().st_size for f in cache_files)
total_size_mb = round(total_size / (1024 * 1024), 2)
# 统计表情包总数
emoji_count = Emoji.select().count()
with get_db_session() as session:
count_statement = select(func.count()).select_from(Images).where(col(Images.image_type) == ImageType.EMOJI)
emoji_count = session.exec(count_statement).one()
# 计算覆盖率
coverage_percent = round((total_count / emoji_count * 100) if emoji_count > 0 else 0, 1)
@ -1213,12 +1278,17 @@ async def preheat_thumbnail_cache(
_ensure_thumbnail_cache_dir()
# 获取使用次数最高的表情包(未缓存的优先)
emojis = (
Emoji.select()
.where(Emoji.is_banned == False) # noqa: E712 Peewee ORM requires == for boolean comparison
.order_by(Emoji.usage_count.desc())
.limit(limit * 2) # 多查一些,因为有些可能已缓存
)
with get_db_session() as session:
statement = (
select(Images)
.where(
col(Images.image_type) == ImageType.EMOJI,
col(Images.is_banned) == False,
)
.order_by(col(Images.query_count).desc())
.limit(limit * 2)
)
emojis = session.exec(statement).all()
generated = 0
skipped = 0
@ -1228,25 +1298,22 @@ async def preheat_thumbnail_cache(
if generated >= limit:
break
cache_path = _get_thumbnail_cache_path(emoji.emoji_hash)
cache_path = _get_thumbnail_cache_path(emoji.image_hash)
# 已缓存,跳过
if cache_path.exists():
skipped += 1
continue
# 原文件不存在,跳过
if not os.path.exists(emoji.full_path):
failed += 1
continue
try:
# 使用线程池异步生成缩略图,避免阻塞事件循环
loop = asyncio.get_event_loop()
await loop.run_in_executor(_thumbnail_executor, _generate_thumbnail, emoji.full_path, emoji.emoji_hash)
await loop.run_in_executor(_thumbnail_executor, _generate_thumbnail, emoji.full_path, emoji.image_hash)
generated += 1
except Exception as e:
logger.warning(f"预热缩略图失败 {emoji.emoji_hash}: {e}")
logger.warning(f"预热缩略图失败 {emoji.image_hash}: {e}")
failed += 1
return ThumbnailPreheatResponse(

View File

@ -7,7 +7,6 @@ class EmojiResponse(BaseModel):
id: int
full_path: str
format: str
emoji_hash: str
description: str
query_count: int
@ -16,7 +15,6 @@ class EmojiResponse(BaseModel):
emotion: Optional[str]
record_time: float
register_time: Optional[float]
usage_count: int
last_used_time: Optional[float]