From 28af002477fec2b18f91e58279e3097e75ed3faa Mon Sep 17 00:00:00 2001 From: Ronifue Date: Sun, 30 Nov 2025 19:24:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(webui):=20=E6=B7=BB=E5=8A=A0=E8=A1=A8?= =?UTF-8?q?=E6=83=85=E5=8C=85=E5=9B=BE=E7=89=87=E4=BB=A3=E7=90=86=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E7=B3=BB=E7=BB=9F=EF=BC=8C=E4=BC=98=E5=8C=96=20WebUI?= =?UTF-8?q?=20=E5=8A=A0=E8=BD=BD=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/webui/emoji_routes.py | 166 +++++++++++++++-- src/webui/image_proxy/__init__.py | 5 + src/webui/image_proxy/proxy_manager.py | 237 +++++++++++++++++++++++++ src/webui/image_proxy/transcoder.py | 115 ++++++++++++ src/webui/webui_server.py | 27 +++ 5 files changed, 537 insertions(+), 13 deletions(-) create mode 100644 src/webui/image_proxy/__init__.py create mode 100644 src/webui/image_proxy/proxy_manager.py create mode 100644 src/webui/image_proxy/transcoder.py diff --git a/src/webui/emoji_routes.py b/src/webui/emoji_routes.py index 94f77b95..a4758dad 100644 --- a/src/webui/emoji_routes.py +++ b/src/webui/emoji_routes.py @@ -477,7 +477,7 @@ async def get_emoji_thumbnail( authorization: Optional[str] = Header(None), ): """ - 获取表情包缩略图 + 获取表情包缩略图(优先返回优化后的 WebP 代理图片) Args: emoji_id: 表情包ID @@ -485,7 +485,7 @@ async def get_emoji_thumbnail( authorization: Authorization header Returns: - 表情包图片文件 + 表情包图片文件(优先 WebP 代理图片,否则原图) """ try: # 优先使用 query parameter 中的 token(用于 img 标签) @@ -506,19 +506,24 @@ async def get_emoji_thumbnail( if not os.path.exists(emoji.full_path): raise HTTPException(status_code=404, detail="表情包文件不存在") - # 根据格式设置 MIME 类型 - mime_types = { - "png": "image/png", - "jpg": "image/jpeg", - "jpeg": "image/jpeg", - "gif": "image/gif", - "webp": "image/webp", - "bmp": "image/bmp", - } + # 使用代理图片管理器获取优化后的图片路径 + from src.webui.image_proxy import get_proxy_manager - media_type = mime_types.get(emoji.format.lower(), "application/octet-stream") + proxy_manager = get_proxy_manager() + file_path, media_type = await proxy_manager.get_proxy_or_original(emoji) - return FileResponse(path=emoji.full_path, media_type=media_type, filename=f"{emoji.emoji_hash}.{emoji.format}") + # 确定文件名(用于下载时的文件名) + if media_type == "image/webp": + filename = f"{emoji.emoji_hash}.webp" + else: + filename = f"{emoji.emoji_hash}.{emoji.format}" + + return FileResponse( + path=file_path, + media_type=media_type, + filename=filename, + headers={"Cache-Control": "public, max-age=86400"}, # 浏览器缓存 24 小时 + ) except HTTPException: raise @@ -868,3 +873,138 @@ async def batch_upload_emoji( except Exception as e: logger.exception(f"批量上传表情包失败: {e}") raise HTTPException(status_code=500, detail=f"批量上传失败: {str(e)}") from e + + +# ==================== 图片代理缓存管理 API ==================== + + +@router.get("/proxy/stats") +async def get_proxy_cache_stats(authorization: Optional[str] = Header(None)): + """ + 获取图片代理缓存统计信息 + + Args: + authorization: Authorization header + + Returns: + 缓存统计信息 + """ + try: + verify_auth_token(authorization) + + from src.webui.image_proxy import get_proxy_manager + from src.webui.image_proxy.proxy_manager import PROXY_DIR + + proxy_manager = get_proxy_manager() + + # 统计缓存文件 + webp_count = 0 + skip_count = 0 + total_cache_size = 0 + + if os.path.exists(PROXY_DIR): + for filename in os.listdir(PROXY_DIR): + file_path = os.path.join(PROXY_DIR, filename) + if filename.endswith(".webp"): + webp_count += 1 + total_cache_size += os.path.getsize(file_path) + elif filename.endswith(".skip"): + skip_count += 1 + + # 统计表情包总数 + total_emojis = Emoji.select().count() + + # 计算缓存覆盖率 + cached_count = webp_count + skip_count + coverage = (cached_count / total_emojis * 100) if total_emojis > 0 else 0 + + return { + "success": True, + "data": { + "total_emojis": total_emojis, + "webp_cached": webp_count, + "skipped_original_better": skip_count, + "pending_transcode": total_emojis - cached_count, + "cache_coverage_percent": round(coverage, 2), + "cache_size_bytes": total_cache_size, + "cache_size_mb": round(total_cache_size / 1024 / 1024, 2), + "cache_directory": PROXY_DIR, + }, + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"获取代理缓存统计失败: {e}") + raise HTTPException(status_code=500, detail=f"获取代理缓存统计失败: {str(e)}") from e + + +@router.post("/proxy/transcode-all") +async def trigger_transcode_all(authorization: Optional[str] = Header(None)): + """ + 触发批量转码所有未缓存的表情包(异步执行) + + Args: + authorization: Authorization header + + Returns: + 任务启动结果 + """ + try: + verify_auth_token(authorization) + + from src.webui.image_proxy import get_proxy_manager + + proxy_manager = get_proxy_manager() + + # 获取所有表情包并加入转码队列 + emojis = Emoji.select() + enqueued_count = 0 + + for emoji in emojis: + if await proxy_manager.enqueue_transcode(emoji): + enqueued_count += 1 + + return { + "success": True, + "message": f"已将 {enqueued_count} 个表情包加入转码队列", + "enqueued_count": enqueued_count, + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"触发批量转码失败: {e}") + raise HTTPException(status_code=500, detail=f"触发批量转码失败: {str(e)}") from e + + +@router.post("/proxy/cleanup") +async def trigger_cache_cleanup(authorization: Optional[str] = Header(None)): + """ + 触发缓存清理(删除无效的缓存文件) + + Args: + authorization: Authorization header + + Returns: + 清理结果 + """ + try: + verify_auth_token(authorization) + + from src.webui.image_proxy import get_proxy_manager + + proxy_manager = get_proxy_manager() + cleaned_count = await proxy_manager.cleanup_stale_cache() + + return { + "success": True, + "message": f"已清理 {cleaned_count} 个无效缓存文件", + "cleaned_count": cleaned_count, + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"触发缓存清理失败: {e}") + raise HTTPException(status_code=500, detail=f"触发缓存清理失败: {str(e)}") from e diff --git a/src/webui/image_proxy/__init__.py b/src/webui/image_proxy/__init__.py new file mode 100644 index 00000000..28e02e67 --- /dev/null +++ b/src/webui/image_proxy/__init__.py @@ -0,0 +1,5 @@ +"""图片代理模块 - 为 WebUI 提供轻量化的图片缓存服务""" + +from .proxy_manager import get_proxy_manager, ProxyManager + +__all__ = ["get_proxy_manager", "ProxyManager"] diff --git a/src/webui/image_proxy/proxy_manager.py b/src/webui/image_proxy/proxy_manager.py new file mode 100644 index 00000000..c517620e --- /dev/null +++ b/src/webui/image_proxy/proxy_manager.py @@ -0,0 +1,237 @@ +"""代理图片管理器 - 管理图片缓存和转码队列""" + +import os +import asyncio +import mimetypes +from typing import Optional, Set, TYPE_CHECKING +from src.common.logger import get_logger +from .transcoder import Transcoder + +if TYPE_CHECKING: + from src.common.database.database_model import Emoji + +logger = get_logger("image_proxy.manager") + +# 配置常量 +PROXY_DIR = os.path.join("data", "emoji_proxy") +WEBP_QUALITY = 75 +MAX_CONCURRENT_TRANSCODE = 2 +TRANSCODE_INTERVAL = 0.3 # 秒 +CLEANUP_INTERVAL = 3600 # 秒 + +# 支持转码的格式 +SUPPORTED_FORMATS = {"png", "jpg", "jpeg", "gif", "webp", "bmp"} + + +class ProxyManager: + """代理图片管理器""" + + _instance: Optional["ProxyManager"] = None + + def __new__(cls) -> "ProxyManager": + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._initialized = True + self._transcoder = Transcoder(quality=WEBP_QUALITY) + self._transcode_queue: asyncio.Queue = asyncio.Queue() + self._pending_hashes: Set[str] = set() # 防止重复入队 + self._pending_lock = asyncio.Lock() + self._workers_started = False + self._worker_tasks: list = [] + + # 确保缓存目录存在 + os.makedirs(PROXY_DIR, exist_ok=True) + logger.info(f"代理图片管理器已初始化,缓存目录: {PROXY_DIR}") + + def get_cache_path(self, emoji_hash: str) -> str: + """获取缓存文件路径""" + return os.path.join(PROXY_DIR, f"{emoji_hash}.webp") + + def get_skip_marker_path(self, emoji_hash: str) -> str: + """获取跳过标记文件路径""" + return os.path.join(PROXY_DIR, f"{emoji_hash}.skip") + + def get_cached_proxy_path(self, emoji: "Emoji") -> Optional[str]: + """获取已缓存的代理图片路径,不存在返回 None""" + cache_path = self.get_cache_path(emoji.emoji_hash) + return cache_path if os.path.exists(cache_path) else None + + def should_skip_transcode(self, emoji: "Emoji") -> bool: + """检查是否应该跳过转码(原图更优)""" + return os.path.exists(self.get_skip_marker_path(emoji.emoji_hash)) + + async def enqueue_transcode(self, emoji: "Emoji") -> bool: + """将转码任务加入队列(非阻塞)""" + # 检查格式是否支持转码 + if emoji.format.lower() not in SUPPORTED_FORMATS: + return False + + async with self._pending_lock: + # 检查是否已在队列中或已有缓存 + if emoji.emoji_hash in self._pending_hashes: + return False + if self.get_cached_proxy_path(emoji) or self.should_skip_transcode(emoji): + return False + if not os.path.exists(emoji.full_path): + return False + + self._pending_hashes.add(emoji.emoji_hash) + await self._transcode_queue.put((emoji.emoji_hash, emoji.full_path)) + logger.debug(f"已加入转码队列: {emoji.emoji_hash[:8]}...") + return True + + async def _worker(self, worker_id: int) -> None: + """后台转码工作协程""" + logger.debug(f"转码工作协程 #{worker_id} 已启动") + while True: + try: + emoji_hash, source_path = await self._transcode_queue.get() + try: + await self._do_transcode(emoji_hash, source_path) + except Exception as e: + logger.error(f"转码任务异常: {e}") + finally: + async with self._pending_lock: + self._pending_hashes.discard(emoji_hash) + self._transcode_queue.task_done() + await asyncio.sleep(TRANSCODE_INTERVAL) + except asyncio.CancelledError: + logger.debug(f"转码工作协程 #{worker_id} 已停止") + break + except Exception as e: + logger.error(f"转码工作协程 #{worker_id} 错误: {e}") + await asyncio.sleep(1) + + async def _do_transcode(self, emoji_hash: str, source_path: str) -> None: + """执行实际的转码操作""" + if not os.path.exists(source_path): + return + + cache_path = self.get_cache_path(emoji_hash) + skip_path = self.get_skip_marker_path(emoji_hash) + + if os.path.exists(cache_path) or os.path.exists(skip_path): + return + + success, original_size, webp_size = await self._transcoder.transcode_to_webp( + source_path, cache_path + ) + + if not success: + return + + # 压缩后更大,删除 WebP,创建跳过标记 + if webp_size >= original_size: + logger.debug(f"压缩后体积更大,使用原图: {emoji_hash[:8]}...") + try: + os.remove(cache_path) + except Exception: + pass + try: + with open(skip_path, "w") as f: + f.write(f"original:{original_size},webp:{webp_size}") + except Exception as e: + logger.error(f"创建跳过标记失败: {e}") + else: + logger.debug(f"转码成功: {emoji_hash[:8]}... ({original_size} -> {webp_size})") + + async def start_workers(self) -> None: + """启动后台转码工作协程""" + if self._workers_started: + return + self._workers_started = True + + for i in range(MAX_CONCURRENT_TRANSCODE): + self._worker_tasks.append(asyncio.create_task(self._worker(i))) + self._worker_tasks.append(asyncio.create_task(self._cleanup_worker())) + + logger.info(f"已启动 {MAX_CONCURRENT_TRANSCODE} 个转码工作协程") + + async def stop_workers(self) -> None: + """停止后台工作协程""" + if not self._workers_started: + return + self._workers_started = False + + for task in self._worker_tasks: + task.cancel() + await asyncio.gather(*self._worker_tasks, return_exceptions=True) + self._worker_tasks.clear() + logger.info("后台工作协程已停止") + + async def _cleanup_worker(self) -> None: + """后台清理工作协程""" + while True: + try: + await asyncio.sleep(CLEANUP_INTERVAL) + await self.cleanup_stale_cache() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"清理任务错误: {e}") + + async def cleanup_stale_cache(self) -> int: + """清理失效的缓存文件,返回清理数量""" + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self._cleanup_stale_cache_sync) + except Exception as e: + logger.error(f"清理缓存任务调度错误: {e}") + return 0 + + def _cleanup_stale_cache_sync(self) -> int: + """同步执行清理失效缓存文件""" + try: + from src.common.database.database_model import Emoji + + # 注意:这里涉及数据库查询和大量文件IO,必须在线程池中运行 + valid_hashes = {e.emoji_hash for e in Emoji.select(Emoji.emoji_hash)} + cleaned_count = 0 + + if not os.path.exists(PROXY_DIR): + return 0 + + for filename in os.listdir(PROXY_DIR): + hash_part = filename.rsplit(".", 1)[0] + if hash_part not in valid_hashes: + try: + os.remove(os.path.join(PROXY_DIR, filename)) + cleaned_count += 1 + except Exception: + pass + + if cleaned_count > 0: + logger.info(f"已清理 {cleaned_count} 个失效缓存文件") + return cleaned_count + except Exception as e: + logger.error(f"清理缓存错误: {e}") + return 0 + + async def get_proxy_or_original(self, emoji: "Emoji") -> tuple[str, str]: + """获取代理图片路径或原图路径,返回 (file_path, media_type)""" + proxy_path = self.get_cached_proxy_path(emoji) + if proxy_path: + return proxy_path, "image/webp" + + # 动态获取 MIME 类型 + mime_type, _ = mimetypes.guess_type(emoji.full_path) + if not mime_type: + mime_type = "application/octet-stream" + + if self.should_skip_transcode(emoji): + return emoji.full_path, mime_type + + await self.enqueue_transcode(emoji) + return emoji.full_path, mime_type + + +def get_proxy_manager() -> ProxyManager: + """获取全局代理图片管理器实例""" + return ProxyManager() diff --git a/src/webui/image_proxy/transcoder.py b/src/webui/image_proxy/transcoder.py new file mode 100644 index 00000000..7f496da8 --- /dev/null +++ b/src/webui/image_proxy/transcoder.py @@ -0,0 +1,115 @@ +"""图片转码引擎 - 将各种格式转换为优化的 WebP""" + +import os +import asyncio +from typing import Tuple +from PIL import Image +from src.common.logger import get_logger + +logger = get_logger("image_proxy.transcoder") + + +# 配置常量 +MAX_ANIMATION_FRAMES = 150 # 限制最大帧数防止内存爆炸 + + +class Transcoder: + """图片转码引擎""" + + def __init__(self, quality: int = 75): + self.quality = quality + + async def transcode_to_webp( + self, source_path: str, target_path: str + ) -> Tuple[bool, int, int]: + """将图片转码为 WebP 格式,返回 (success, original_size, webp_size)""" + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self._transcode_sync, source_path, target_path + ) + except Exception as e: + logger.error(f"转码失败 {source_path}: {e}") + return False, 0, 0 + + def _transcode_sync( + self, source_path: str, target_path: str + ) -> Tuple[bool, int, int]: + """同步转码(在线程池中执行)""" + temp_path = target_path + ".tmp" + try: + original_size = os.path.getsize(source_path) + os.makedirs(os.path.dirname(target_path), exist_ok=True) + + with Image.open(source_path) as img: + is_animated = getattr(img, "is_animated", False) + n_frames = getattr(img, "n_frames", 1) + + if is_animated and n_frames > 1: + if n_frames > MAX_ANIMATION_FRAMES: + logger.warning( + f"图片帧数 ({n_frames}) 超过限制 {MAX_ANIMATION_FRAMES},将仅转码第一帧: {os.path.basename(source_path)}" + ) + self._transcode_static(img, temp_path) + else: + self._transcode_animated(img, temp_path) + else: + self._transcode_static(img, temp_path) + + webp_size = os.path.getsize(temp_path) + + if os.path.exists(target_path): + os.remove(target_path) + os.rename(temp_path, target_path) + + logger.debug( + f"转码完成: {os.path.basename(source_path)} " + f"({original_size} -> {webp_size}, -{100 - webp_size * 100 // original_size}%)" + ) + return True, original_size, webp_size + + except Exception as e: + logger.error(f"转码处理失败 {source_path}: {e}") + if os.path.exists(temp_path): + try: + os.remove(temp_path) + except Exception: + pass + return False, 0, 0 + + def _transcode_static(self, img: Image.Image, target_path: str) -> None: + """转码静态图片""" + img = self._convert_image_mode(img) + img.save(target_path, format="WEBP", quality=self.quality, method=4) + + def _transcode_animated(self, img: Image.Image, target_path: str) -> None: + """转码动态图片(GIF → Animated WebP)""" + frames = [] + durations = [] + + for frame_num in range(img.n_frames): + img.seek(frame_num) + frame = self._convert_image_mode(img.copy()) + + frames.append(frame) + durations.append(img.info.get("duration", 100)) + + if frames: + frames[0].save( + target_path, + format="WEBP", + save_all=True, + append_images=frames[1:] if len(frames) > 1 else [], + duration=durations, + loop=img.info.get("loop", 0), + quality=self.quality, + method=4, + ) + + def _convert_image_mode(self, img: Image.Image) -> Image.Image: + """转换图片模式以兼容 WebP""" + if img.mode in ("P", "LA"): + return img.convert("RGBA") + elif img.mode not in ("RGB", "RGBA"): + return img.convert("RGB") + return img diff --git a/src/webui/webui_server.py b/src/webui/webui_server.py index 5997c3ba..0444d3c9 100644 --- a/src/webui/webui_server.py +++ b/src/webui/webui_server.py @@ -110,6 +110,27 @@ class WebUIServer: except Exception as e: logger.error(f"❌ 注册 WebUI API 路由失败: {e}", exc_info=True) + async def _start_image_proxy_workers(self): + """启动图片代理后台工作协程""" + try: + from src.webui.image_proxy import get_proxy_manager + + proxy_manager = get_proxy_manager() + await proxy_manager.start_workers() + logger.info("✅ 图片代理后台工作协程已启动") + except Exception as e: + logger.warning(f"⚠️ 图片代理启动失败(不影响主功能): {e}") + + async def _stop_image_proxy_workers(self): + """停止图片代理后台工作协程""" + try: + from src.webui.image_proxy import get_proxy_manager + + proxy_manager = get_proxy_manager() + await proxy_manager.stop_workers() + except Exception as e: + logger.warning(f"⚠️ 图片代理停止失败: {e}") + async def start(self): """启动服务器""" config = Config( @@ -126,6 +147,9 @@ class WebUIServer: if self.host == "0.0.0.0": logger.info(f"本机访问请使用 http://localhost:{self.port}") + # 启动图片代理后台工作协程 + await self._start_image_proxy_workers() + try: await self._server.serve() except Exception as e: @@ -134,6 +158,9 @@ class WebUIServer: async def shutdown(self): """关闭服务器""" + # 先停止图片代理后台工作协程 + await self._stop_image_proxy_workers() + if self._server: logger.info("正在关闭 WebUI 服务器...") self._server.should_exit = True