From 50acd8d847e19dbb58d22a60ba05b9661420d9a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Sun, 30 Nov 2025 17:31:54 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E7=BC=A9=E7=95=A5?= =?UTF-8?q?=E5=9B=BE=E7=BC=93=E5=AD=98=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E5=8C=85=E6=8B=AC=E7=94=9F=E6=88=90=E3=80=81=E6=B8=85?= =?UTF-8?q?=E7=90=86=E5=92=8C=E7=BB=9F=E8=AE=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/webui/emoji_routes.py | 422 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 408 insertions(+), 14 deletions(-) diff --git a/src/webui/emoji_routes.py b/src/webui/emoji_routes.py index c4d90ea2..1fe5be09 100644 --- a/src/webui/emoji_routes.py +++ b/src/webui/emoji_routes.py @@ -13,9 +13,134 @@ import os import hashlib from PIL import Image import io +from pathlib import Path +import threading logger = get_logger("webui.emoji") +# ==================== 缩略图缓存配置 ==================== +# 缩略图缓存目录 +THUMBNAIL_CACHE_DIR = Path("data/emoji_thumbnails") +# 缩略图尺寸 (宽, 高) +THUMBNAIL_SIZE = (200, 200) +# 缩略图质量 (WebP 格式, 1-100) +THUMBNAIL_QUALITY = 80 +# 缓存锁,防止并发生成同一缩略图 +_thumbnail_locks: dict[str, threading.Lock] = {} +_locks_lock = threading.Lock() + + +def _get_thumbnail_lock(file_hash: str) -> threading.Lock: + """获取指定文件哈希的锁,用于防止并发生成同一缩略图""" + with _locks_lock: + if file_hash not in _thumbnail_locks: + _thumbnail_locks[file_hash] = threading.Lock() + return _thumbnail_locks[file_hash] + + +def _ensure_thumbnail_cache_dir() -> Path: + """确保缩略图缓存目录存在""" + THUMBNAIL_CACHE_DIR.mkdir(parents=True, exist_ok=True) + return THUMBNAIL_CACHE_DIR + + +def _get_thumbnail_cache_path(file_hash: str) -> Path: + """获取缩略图缓存路径""" + return THUMBNAIL_CACHE_DIR / f"{file_hash}.webp" + + +def _generate_thumbnail(source_path: str, file_hash: str) -> Path: + """ + 生成缩略图并保存到缓存目录 + + Args: + source_path: 原图路径 + file_hash: 文件哈希值,用作缓存文件名 + + Returns: + 缩略图路径 + + Features: + - GIF: 提取第一帧作为缩略图 + - 所有格式统一转为 WebP + - 保持宽高比缩放 + """ + _ensure_thumbnail_cache_dir() + cache_path = _get_thumbnail_cache_path(file_hash) + + # 使用锁防止并发生成同一缩略图 + lock = _get_thumbnail_lock(file_hash) + with lock: + # 双重检查,可能在等待锁时已被其他线程生成 + if cache_path.exists(): + return cache_path + + try: + with Image.open(source_path) as img: + # GIF 处理:提取第一帧 + if hasattr(img, 'n_frames') and img.n_frames > 1: + img.seek(0) # 确保在第一帧 + + # 转换为 RGB/RGBA(WebP 支持透明度) + if img.mode in ('P', 'PA'): + # 调色板模式转换为 RGBA 以保留透明度 + img = img.convert('RGBA') + elif img.mode == 'LA': + img = img.convert('RGBA') + elif img.mode not in ('RGB', 'RGBA'): + img = img.convert('RGB') + + # 创建缩略图(保持宽高比) + img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS) + + # 保存为 WebP 格式 + img.save(cache_path, 'WEBP', quality=THUMBNAIL_QUALITY, method=6) + + logger.debug(f"生成缩略图: {file_hash} -> {cache_path}") + + except Exception as e: + logger.warning(f"生成缩略图失败 {file_hash}: {e},将返回原图") + # 生成失败时不创建缓存文件,下次会重试 + raise + + return cache_path + + +def cleanup_orphaned_thumbnails() -> tuple[int, int]: + """ + 清理孤立的缩略图缓存(原图已不存在的缩略图) + + Returns: + (清理数量, 保留数量) + """ + if not THUMBNAIL_CACHE_DIR.exists(): + return 0, 0 + + # 获取所有表情包的哈希值 + valid_hashes = set() + for emoji in Emoji.select(Emoji.emoji_hash): + valid_hashes.add(emoji.emoji_hash) + + cleaned = 0 + kept = 0 + + for cache_file in THUMBNAIL_CACHE_DIR.glob("*.webp"): + file_hash = cache_file.stem + if file_hash not in valid_hashes: + try: + cache_file.unlink() + cleaned += 1 + logger.debug(f"清理孤立缩略图: {cache_file.name}") + except Exception as e: + logger.warning(f"清理缩略图失败 {cache_file.name}: {e}") + else: + kept += 1 + + if cleaned > 0: + logger.info(f"清理孤立缩略图: 删除 {cleaned} 个,保留 {kept} 个") + + return cleaned, kept + # 模块级别的类型别名(解决 B008 ruff 错误) EmojiFile = Annotated[UploadFile, File(description="表情包图片文件")] EmojiFiles = Annotated[List[UploadFile], File(description="多个表情包图片文件")] @@ -472,18 +597,26 @@ async def get_emoji_thumbnail( token: Optional[str] = Query(None, description="访问令牌"), maibot_session: Optional[str] = Cookie(None), authorization: Optional[str] = Header(None), + original: bool = Query(False, description="是否返回原图"), ): """ - 获取表情包缩略图 + 获取表情包缩略图(懒加载生成 + 缓存) Args: emoji_id: 表情包ID token: 访问令牌(通过 query parameter,用于向后兼容) maibot_session: Cookie 中的 token authorization: Authorization header + original: 是否返回原图(用于详情页查看原图) Returns: - 表情包图片文件 + 表情包缩略图(WebP 格式)或原图 + + Features: + - 懒加载:首次请求时生成缩略图 + - 缓存:后续请求直接返回缓存 + - GIF 支持:提取第一帧作为缩略图 + - 格式统一:所有缩略图统一为 WebP 格式 """ try: token_manager = get_token_manager() @@ -513,19 +646,55 @@ async def get_emoji_thumbnail( if not os.path.exists(emoji.full_path): raise HTTPException(status_code=404, detail="表情包文件不存在") - # 根据格式设置 MIME 类型 - mime_types = { - "png": "image/png", - "jpg": "image/jpeg", - "jpeg": "image/jpeg", - "gif": "image/gif", - "webp": "image/webp", - "bmp": "image/bmp", - } + # 如果请求原图,直接返回原文件 + if original: + mime_types = { + "png": "image/png", + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "gif": "image/gif", + "webp": "image/webp", + "bmp": "image/bmp", + } + media_type = mime_types.get(emoji.format.lower(), "application/octet-stream") + return FileResponse( + path=emoji.full_path, + media_type=media_type, + filename=f"{emoji.emoji_hash}.{emoji.format}" + ) - media_type = mime_types.get(emoji.format.lower(), "application/octet-stream") - - return FileResponse(path=emoji.full_path, media_type=media_type, filename=f"{emoji.emoji_hash}.{emoji.format}") + # 尝试获取或生成缩略图 + cache_path = _get_thumbnail_cache_path(emoji.emoji_hash) + + # 检查缓存是否存在 + if not cache_path.exists(): + try: + # 生成缩略图 + _generate_thumbnail(emoji.full_path, emoji.emoji_hash) + except Exception as e: + # 生成失败,回退到原图 + logger.warning(f"缩略图生成失败,返回原图: {e}") + mime_types = { + "png": "image/png", + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "gif": "image/gif", + "webp": "image/webp", + "bmp": "image/bmp", + } + media_type = mime_types.get(emoji.format.lower(), "application/octet-stream") + return FileResponse( + path=emoji.full_path, + media_type=media_type, + filename=f"{emoji.emoji_hash}.{emoji.format}" + ) + + # 返回缩略图 + return FileResponse( + path=str(cache_path), + media_type="image/webp", + filename=f"{emoji.emoji_hash}_thumb.webp" + ) except HTTPException: raise @@ -877,3 +1046,228 @@ async def batch_upload_emoji( except Exception as e: logger.exception(f"批量上传表情包失败: {e}") raise HTTPException(status_code=500, detail=f"批量上传失败: {str(e)}") from e + + +# ==================== 缩略图缓存管理 API ==================== + + +class ThumbnailCacheStatsResponse(BaseModel): + """缩略图缓存统计响应""" + + success: bool + cache_dir: str + total_count: int + total_size_mb: float + emoji_count: int + coverage_percent: float + + +class ThumbnailCleanupResponse(BaseModel): + """缩略图清理响应""" + + success: bool + message: str + cleaned_count: int + kept_count: int + + +class ThumbnailPreheatResponse(BaseModel): + """缩略图预热响应""" + + success: bool + message: str + generated_count: int + skipped_count: int + failed_count: int + + +@router.get("/thumbnail-cache/stats", response_model=ThumbnailCacheStatsResponse) +async def get_thumbnail_cache_stats( + maibot_session: Optional[str] = Cookie(None), + authorization: Optional[str] = Header(None), +): + """ + 获取缩略图缓存统计信息 + + Returns: + 缓存目录、缓存数量、总大小、覆盖率等统计信息 + """ + try: + verify_auth_token(maibot_session, authorization) + + _ensure_thumbnail_cache_dir() + + # 统计缓存文件 + cache_files = list(THUMBNAIL_CACHE_DIR.glob("*.webp")) + total_count = len(cache_files) + total_size = sum(f.stat().st_size for f in cache_files) + total_size_mb = round(total_size / (1024 * 1024), 2) + + # 统计表情包总数 + emoji_count = Emoji.select().count() + + # 计算覆盖率 + coverage_percent = round((total_count / emoji_count * 100) if emoji_count > 0 else 0, 1) + + return ThumbnailCacheStatsResponse( + success=True, + cache_dir=str(THUMBNAIL_CACHE_DIR.absolute()), + total_count=total_count, + total_size_mb=total_size_mb, + emoji_count=emoji_count, + coverage_percent=coverage_percent, + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取缩略图缓存统计失败: {e}") + raise HTTPException(status_code=500, detail=f"获取统计失败: {str(e)}") from e + + +@router.post("/thumbnail-cache/cleanup", response_model=ThumbnailCleanupResponse) +async def cleanup_thumbnail_cache( + maibot_session: Optional[str] = Cookie(None), + authorization: Optional[str] = Header(None), +): + """ + 清理孤立的缩略图缓存(原图已删除的表情包对应的缩略图) + + Returns: + 清理结果 + """ + try: + verify_auth_token(maibot_session, authorization) + + cleaned, kept = cleanup_orphaned_thumbnails() + + return ThumbnailCleanupResponse( + success=True, + message=f"清理完成:删除 {cleaned} 个孤立缓存,保留 {kept} 个有效缓存", + cleaned_count=cleaned, + kept_count=kept, + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"清理缩略图缓存失败: {e}") + raise HTTPException(status_code=500, detail=f"清理失败: {str(e)}") from e + + +@router.post("/thumbnail-cache/preheat", response_model=ThumbnailPreheatResponse) +async def preheat_thumbnail_cache( + limit: int = Query(100, ge=1, le=1000, description="最多预热数量"), + maibot_session: Optional[str] = Cookie(None), + authorization: Optional[str] = Header(None), +): + """ + 预热缩略图缓存(提前生成未缓存的缩略图) + + 优先处理使用次数高的表情包 + + Args: + limit: 最多预热数量 (1-1000) + + Returns: + 预热结果 + """ + try: + verify_auth_token(maibot_session, authorization) + + _ensure_thumbnail_cache_dir() + + # 获取使用次数最高的表情包(未缓存的优先) + emojis = ( + Emoji.select() + .where(Emoji.is_banned == False) # noqa: E712 Peewee ORM requires == for boolean comparison + .order_by(Emoji.usage_count.desc()) + .limit(limit * 2) # 多查一些,因为有些可能已缓存 + ) + + generated = 0 + skipped = 0 + failed = 0 + + for emoji in emojis: + if generated >= limit: + break + + cache_path = _get_thumbnail_cache_path(emoji.emoji_hash) + + # 已缓存,跳过 + if cache_path.exists(): + skipped += 1 + continue + + # 原文件不存在,跳过 + if not os.path.exists(emoji.full_path): + failed += 1 + continue + + try: + _generate_thumbnail(emoji.full_path, emoji.emoji_hash) + generated += 1 + except Exception as e: + logger.warning(f"预热缩略图失败 {emoji.emoji_hash}: {e}") + failed += 1 + + return ThumbnailPreheatResponse( + success=True, + message=f"预热完成:生成 {generated} 个,跳过 {skipped} 个已缓存,失败 {failed} 个", + generated_count=generated, + skipped_count=skipped, + failed_count=failed, + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"预热缩略图缓存失败: {e}") + raise HTTPException(status_code=500, detail=f"预热失败: {str(e)}") from e + + +@router.delete("/thumbnail-cache/clear", response_model=ThumbnailCleanupResponse) +async def clear_all_thumbnail_cache( + maibot_session: Optional[str] = Cookie(None), + authorization: Optional[str] = Header(None), +): + """ + 清空所有缩略图缓存(下次访问时会重新生成) + + Returns: + 清理结果 + """ + try: + verify_auth_token(maibot_session, authorization) + + if not THUMBNAIL_CACHE_DIR.exists(): + return ThumbnailCleanupResponse( + success=True, + message="缓存目录不存在,无需清理", + cleaned_count=0, + kept_count=0, + ) + + cleaned = 0 + for cache_file in THUMBNAIL_CACHE_DIR.glob("*.webp"): + try: + cache_file.unlink() + cleaned += 1 + except Exception as e: + logger.warning(f"删除缓存文件失败 {cache_file.name}: {e}") + + logger.info(f"已清空缩略图缓存: 删除 {cleaned} 个文件") + + return ThumbnailCleanupResponse( + success=True, + message=f"已清空所有缩略图缓存:删除 {cleaned} 个文件", + cleaned_count=cleaned, + kept_count=0, + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"清空缩略图缓存失败: {e}") + raise HTTPException(status_code=500, detail=f"清空失败: {str(e)}") from e