feat(webui): 添加表情包图片代理缓存系统,优化 WebUI 加载性能

pull/1393/head
Ronifue 2025-11-30 19:24:00 +08:00
parent 9b6ca0e0a5
commit 28af002477
5 changed files with 537 additions and 13 deletions

View File

@ -477,7 +477,7 @@ async def get_emoji_thumbnail(
authorization: Optional[str] = Header(None),
):
"""
获取表情包缩略图
获取表情包缩略图优先返回优化后的 WebP 代理图片
Args:
emoji_id: 表情包ID
@ -485,7 +485,7 @@ async def get_emoji_thumbnail(
authorization: Authorization header
Returns:
表情包图片文件
表情包图片文件优先 WebP 代理图片否则原图
"""
try:
# 优先使用 query parameter 中的 token用于 img 标签)
@ -506,19 +506,24 @@ async def get_emoji_thumbnail(
if not os.path.exists(emoji.full_path):
raise HTTPException(status_code=404, detail="表情包文件不存在")
# 根据格式设置 MIME 类型
mime_types = {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"webp": "image/webp",
"bmp": "image/bmp",
}
# 使用代理图片管理器获取优化后的图片路径
from src.webui.image_proxy import get_proxy_manager
media_type = mime_types.get(emoji.format.lower(), "application/octet-stream")
proxy_manager = get_proxy_manager()
file_path, media_type = await proxy_manager.get_proxy_or_original(emoji)
return FileResponse(path=emoji.full_path, media_type=media_type, filename=f"{emoji.emoji_hash}.{emoji.format}")
# 确定文件名(用于下载时的文件名)
if media_type == "image/webp":
filename = f"{emoji.emoji_hash}.webp"
else:
filename = f"{emoji.emoji_hash}.{emoji.format}"
return FileResponse(
path=file_path,
media_type=media_type,
filename=filename,
headers={"Cache-Control": "public, max-age=86400"}, # 浏览器缓存 24 小时
)
except HTTPException:
raise
@ -868,3 +873,138 @@ async def batch_upload_emoji(
except Exception as e:
logger.exception(f"批量上传表情包失败: {e}")
raise HTTPException(status_code=500, detail=f"批量上传失败: {str(e)}") from e
# ==================== 图片代理缓存管理 API ====================
@router.get("/proxy/stats")
async def get_proxy_cache_stats(authorization: Optional[str] = Header(None)):
"""
获取图片代理缓存统计信息
Args:
authorization: Authorization header
Returns:
缓存统计信息
"""
try:
verify_auth_token(authorization)
from src.webui.image_proxy import get_proxy_manager
from src.webui.image_proxy.proxy_manager import PROXY_DIR
proxy_manager = get_proxy_manager()
# 统计缓存文件
webp_count = 0
skip_count = 0
total_cache_size = 0
if os.path.exists(PROXY_DIR):
for filename in os.listdir(PROXY_DIR):
file_path = os.path.join(PROXY_DIR, filename)
if filename.endswith(".webp"):
webp_count += 1
total_cache_size += os.path.getsize(file_path)
elif filename.endswith(".skip"):
skip_count += 1
# 统计表情包总数
total_emojis = Emoji.select().count()
# 计算缓存覆盖率
cached_count = webp_count + skip_count
coverage = (cached_count / total_emojis * 100) if total_emojis > 0 else 0
return {
"success": True,
"data": {
"total_emojis": total_emojis,
"webp_cached": webp_count,
"skipped_original_better": skip_count,
"pending_transcode": total_emojis - cached_count,
"cache_coverage_percent": round(coverage, 2),
"cache_size_bytes": total_cache_size,
"cache_size_mb": round(total_cache_size / 1024 / 1024, 2),
"cache_directory": PROXY_DIR,
},
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"获取代理缓存统计失败: {e}")
raise HTTPException(status_code=500, detail=f"获取代理缓存统计失败: {str(e)}") from e
@router.post("/proxy/transcode-all")
async def trigger_transcode_all(authorization: Optional[str] = Header(None)):
"""
触发批量转码所有未缓存的表情包异步执行
Args:
authorization: Authorization header
Returns:
任务启动结果
"""
try:
verify_auth_token(authorization)
from src.webui.image_proxy import get_proxy_manager
proxy_manager = get_proxy_manager()
# 获取所有表情包并加入转码队列
emojis = Emoji.select()
enqueued_count = 0
for emoji in emojis:
if await proxy_manager.enqueue_transcode(emoji):
enqueued_count += 1
return {
"success": True,
"message": f"已将 {enqueued_count} 个表情包加入转码队列",
"enqueued_count": enqueued_count,
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"触发批量转码失败: {e}")
raise HTTPException(status_code=500, detail=f"触发批量转码失败: {str(e)}") from e
@router.post("/proxy/cleanup")
async def trigger_cache_cleanup(authorization: Optional[str] = Header(None)):
"""
触发缓存清理删除无效的缓存文件
Args:
authorization: Authorization header
Returns:
清理结果
"""
try:
verify_auth_token(authorization)
from src.webui.image_proxy import get_proxy_manager
proxy_manager = get_proxy_manager()
cleaned_count = await proxy_manager.cleanup_stale_cache()
return {
"success": True,
"message": f"已清理 {cleaned_count} 个无效缓存文件",
"cleaned_count": cleaned_count,
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"触发缓存清理失败: {e}")
raise HTTPException(status_code=500, detail=f"触发缓存清理失败: {str(e)}") from e

View File

@ -0,0 +1,5 @@
"""图片代理模块 - 为 WebUI 提供轻量化的图片缓存服务"""
from .proxy_manager import get_proxy_manager, ProxyManager
__all__ = ["get_proxy_manager", "ProxyManager"]

View File

@ -0,0 +1,237 @@
"""代理图片管理器 - 管理图片缓存和转码队列"""
import os
import asyncio
import mimetypes
from typing import Optional, Set, TYPE_CHECKING
from src.common.logger import get_logger
from .transcoder import Transcoder
if TYPE_CHECKING:
from src.common.database.database_model import Emoji
logger = get_logger("image_proxy.manager")
# 配置常量
PROXY_DIR = os.path.join("data", "emoji_proxy")
WEBP_QUALITY = 75
MAX_CONCURRENT_TRANSCODE = 2
TRANSCODE_INTERVAL = 0.3 # 秒
CLEANUP_INTERVAL = 3600 # 秒
# 支持转码的格式
SUPPORTED_FORMATS = {"png", "jpg", "jpeg", "gif", "webp", "bmp"}
class ProxyManager:
"""代理图片管理器"""
_instance: Optional["ProxyManager"] = None
def __new__(cls) -> "ProxyManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._transcoder = Transcoder(quality=WEBP_QUALITY)
self._transcode_queue: asyncio.Queue = asyncio.Queue()
self._pending_hashes: Set[str] = set() # 防止重复入队
self._pending_lock = asyncio.Lock()
self._workers_started = False
self._worker_tasks: list = []
# 确保缓存目录存在
os.makedirs(PROXY_DIR, exist_ok=True)
logger.info(f"代理图片管理器已初始化,缓存目录: {PROXY_DIR}")
def get_cache_path(self, emoji_hash: str) -> str:
"""获取缓存文件路径"""
return os.path.join(PROXY_DIR, f"{emoji_hash}.webp")
def get_skip_marker_path(self, emoji_hash: str) -> str:
"""获取跳过标记文件路径"""
return os.path.join(PROXY_DIR, f"{emoji_hash}.skip")
def get_cached_proxy_path(self, emoji: "Emoji") -> Optional[str]:
"""获取已缓存的代理图片路径,不存在返回 None"""
cache_path = self.get_cache_path(emoji.emoji_hash)
return cache_path if os.path.exists(cache_path) else None
def should_skip_transcode(self, emoji: "Emoji") -> bool:
"""检查是否应该跳过转码(原图更优)"""
return os.path.exists(self.get_skip_marker_path(emoji.emoji_hash))
async def enqueue_transcode(self, emoji: "Emoji") -> bool:
"""将转码任务加入队列(非阻塞)"""
# 检查格式是否支持转码
if emoji.format.lower() not in SUPPORTED_FORMATS:
return False
async with self._pending_lock:
# 检查是否已在队列中或已有缓存
if emoji.emoji_hash in self._pending_hashes:
return False
if self.get_cached_proxy_path(emoji) or self.should_skip_transcode(emoji):
return False
if not os.path.exists(emoji.full_path):
return False
self._pending_hashes.add(emoji.emoji_hash)
await self._transcode_queue.put((emoji.emoji_hash, emoji.full_path))
logger.debug(f"已加入转码队列: {emoji.emoji_hash[:8]}...")
return True
async def _worker(self, worker_id: int) -> None:
"""后台转码工作协程"""
logger.debug(f"转码工作协程 #{worker_id} 已启动")
while True:
try:
emoji_hash, source_path = await self._transcode_queue.get()
try:
await self._do_transcode(emoji_hash, source_path)
except Exception as e:
logger.error(f"转码任务异常: {e}")
finally:
async with self._pending_lock:
self._pending_hashes.discard(emoji_hash)
self._transcode_queue.task_done()
await asyncio.sleep(TRANSCODE_INTERVAL)
except asyncio.CancelledError:
logger.debug(f"转码工作协程 #{worker_id} 已停止")
break
except Exception as e:
logger.error(f"转码工作协程 #{worker_id} 错误: {e}")
await asyncio.sleep(1)
async def _do_transcode(self, emoji_hash: str, source_path: str) -> None:
"""执行实际的转码操作"""
if not os.path.exists(source_path):
return
cache_path = self.get_cache_path(emoji_hash)
skip_path = self.get_skip_marker_path(emoji_hash)
if os.path.exists(cache_path) or os.path.exists(skip_path):
return
success, original_size, webp_size = await self._transcoder.transcode_to_webp(
source_path, cache_path
)
if not success:
return
# 压缩后更大,删除 WebP创建跳过标记
if webp_size >= original_size:
logger.debug(f"压缩后体积更大,使用原图: {emoji_hash[:8]}...")
try:
os.remove(cache_path)
except Exception:
pass
try:
with open(skip_path, "w") as f:
f.write(f"original:{original_size},webp:{webp_size}")
except Exception as e:
logger.error(f"创建跳过标记失败: {e}")
else:
logger.debug(f"转码成功: {emoji_hash[:8]}... ({original_size} -> {webp_size})")
async def start_workers(self) -> None:
"""启动后台转码工作协程"""
if self._workers_started:
return
self._workers_started = True
for i in range(MAX_CONCURRENT_TRANSCODE):
self._worker_tasks.append(asyncio.create_task(self._worker(i)))
self._worker_tasks.append(asyncio.create_task(self._cleanup_worker()))
logger.info(f"已启动 {MAX_CONCURRENT_TRANSCODE} 个转码工作协程")
async def stop_workers(self) -> None:
"""停止后台工作协程"""
if not self._workers_started:
return
self._workers_started = False
for task in self._worker_tasks:
task.cancel()
await asyncio.gather(*self._worker_tasks, return_exceptions=True)
self._worker_tasks.clear()
logger.info("后台工作协程已停止")
async def _cleanup_worker(self) -> None:
"""后台清理工作协程"""
while True:
try:
await asyncio.sleep(CLEANUP_INTERVAL)
await self.cleanup_stale_cache()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"清理任务错误: {e}")
async def cleanup_stale_cache(self) -> int:
"""清理失效的缓存文件,返回清理数量"""
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._cleanup_stale_cache_sync)
except Exception as e:
logger.error(f"清理缓存任务调度错误: {e}")
return 0
def _cleanup_stale_cache_sync(self) -> int:
"""同步执行清理失效缓存文件"""
try:
from src.common.database.database_model import Emoji
# 注意这里涉及数据库查询和大量文件IO必须在线程池中运行
valid_hashes = {e.emoji_hash for e in Emoji.select(Emoji.emoji_hash)}
cleaned_count = 0
if not os.path.exists(PROXY_DIR):
return 0
for filename in os.listdir(PROXY_DIR):
hash_part = filename.rsplit(".", 1)[0]
if hash_part not in valid_hashes:
try:
os.remove(os.path.join(PROXY_DIR, filename))
cleaned_count += 1
except Exception:
pass
if cleaned_count > 0:
logger.info(f"已清理 {cleaned_count} 个失效缓存文件")
return cleaned_count
except Exception as e:
logger.error(f"清理缓存错误: {e}")
return 0
async def get_proxy_or_original(self, emoji: "Emoji") -> tuple[str, str]:
"""获取代理图片路径或原图路径,返回 (file_path, media_type)"""
proxy_path = self.get_cached_proxy_path(emoji)
if proxy_path:
return proxy_path, "image/webp"
# 动态获取 MIME 类型
mime_type, _ = mimetypes.guess_type(emoji.full_path)
if not mime_type:
mime_type = "application/octet-stream"
if self.should_skip_transcode(emoji):
return emoji.full_path, mime_type
await self.enqueue_transcode(emoji)
return emoji.full_path, mime_type
def get_proxy_manager() -> ProxyManager:
"""获取全局代理图片管理器实例"""
return ProxyManager()

View File

@ -0,0 +1,115 @@
"""图片转码引擎 - 将各种格式转换为优化的 WebP"""
import os
import asyncio
from typing import Tuple
from PIL import Image
from src.common.logger import get_logger
logger = get_logger("image_proxy.transcoder")
# 配置常量
MAX_ANIMATION_FRAMES = 150 # 限制最大帧数防止内存爆炸
class Transcoder:
"""图片转码引擎"""
def __init__(self, quality: int = 75):
self.quality = quality
async def transcode_to_webp(
self, source_path: str, target_path: str
) -> Tuple[bool, int, int]:
"""将图片转码为 WebP 格式,返回 (success, original_size, webp_size)"""
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self._transcode_sync, source_path, target_path
)
except Exception as e:
logger.error(f"转码失败 {source_path}: {e}")
return False, 0, 0
def _transcode_sync(
self, source_path: str, target_path: str
) -> Tuple[bool, int, int]:
"""同步转码(在线程池中执行)"""
temp_path = target_path + ".tmp"
try:
original_size = os.path.getsize(source_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with Image.open(source_path) as img:
is_animated = getattr(img, "is_animated", False)
n_frames = getattr(img, "n_frames", 1)
if is_animated and n_frames > 1:
if n_frames > MAX_ANIMATION_FRAMES:
logger.warning(
f"图片帧数 ({n_frames}) 超过限制 {MAX_ANIMATION_FRAMES},将仅转码第一帧: {os.path.basename(source_path)}"
)
self._transcode_static(img, temp_path)
else:
self._transcode_animated(img, temp_path)
else:
self._transcode_static(img, temp_path)
webp_size = os.path.getsize(temp_path)
if os.path.exists(target_path):
os.remove(target_path)
os.rename(temp_path, target_path)
logger.debug(
f"转码完成: {os.path.basename(source_path)} "
f"({original_size} -> {webp_size}, -{100 - webp_size * 100 // original_size}%)"
)
return True, original_size, webp_size
except Exception as e:
logger.error(f"转码处理失败 {source_path}: {e}")
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except Exception:
pass
return False, 0, 0
def _transcode_static(self, img: Image.Image, target_path: str) -> None:
"""转码静态图片"""
img = self._convert_image_mode(img)
img.save(target_path, format="WEBP", quality=self.quality, method=4)
def _transcode_animated(self, img: Image.Image, target_path: str) -> None:
"""转码动态图片GIF → Animated WebP"""
frames = []
durations = []
for frame_num in range(img.n_frames):
img.seek(frame_num)
frame = self._convert_image_mode(img.copy())
frames.append(frame)
durations.append(img.info.get("duration", 100))
if frames:
frames[0].save(
target_path,
format="WEBP",
save_all=True,
append_images=frames[1:] if len(frames) > 1 else [],
duration=durations,
loop=img.info.get("loop", 0),
quality=self.quality,
method=4,
)
def _convert_image_mode(self, img: Image.Image) -> Image.Image:
"""转换图片模式以兼容 WebP"""
if img.mode in ("P", "LA"):
return img.convert("RGBA")
elif img.mode not in ("RGB", "RGBA"):
return img.convert("RGB")
return img

View File

@ -110,6 +110,27 @@ class WebUIServer:
except Exception as e:
logger.error(f"❌ 注册 WebUI API 路由失败: {e}", exc_info=True)
async def _start_image_proxy_workers(self):
"""启动图片代理后台工作协程"""
try:
from src.webui.image_proxy import get_proxy_manager
proxy_manager = get_proxy_manager()
await proxy_manager.start_workers()
logger.info("✅ 图片代理后台工作协程已启动")
except Exception as e:
logger.warning(f"⚠️ 图片代理启动失败(不影响主功能): {e}")
async def _stop_image_proxy_workers(self):
"""停止图片代理后台工作协程"""
try:
from src.webui.image_proxy import get_proxy_manager
proxy_manager = get_proxy_manager()
await proxy_manager.stop_workers()
except Exception as e:
logger.warning(f"⚠️ 图片代理停止失败: {e}")
async def start(self):
"""启动服务器"""
config = Config(
@ -126,6 +147,9 @@ class WebUIServer:
if self.host == "0.0.0.0":
logger.info(f"本机访问请使用 http://localhost:{self.port}")
# 启动图片代理后台工作协程
await self._start_image_proxy_workers()
try:
await self._server.serve()
except Exception as e:
@ -134,6 +158,9 @@ class WebUIServer:
async def shutdown(self):
"""关闭服务器"""
# 先停止图片代理后台工作协程
await self._stop_image_proxy_workers()
if self._server:
logger.info("正在关闭 WebUI 服务器...")
self._server.should_exit = True