From 4141c25fdd5d213df6d0748b4af38572884d403a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 18 Nov 2025 22:54:25 +0800 Subject: [PATCH] feat: add WebSocket plugin progress notification module and integrate with plugin management routes - Implemented a WebSocket endpoint for broadcasting plugin loading progress. - Created functions to update and broadcast progress status to connected clients. - Integrated progress updates into plugin management routes for operations like fetching, cloning, installing, uninstalling, and updating plugins. - Added error handling and logging for progress updates and WebSocket connections. --- src/webui/git_mirror_service.py | 731 ++++++++++++++++++++ src/webui/plugin_progress_ws.py | 127 ++++ src/webui/plugin_routes.py | 1145 +++++++++++++++++++++++++++++++ src/webui/routes.py | 6 + 4 files changed, 2009 insertions(+) create mode 100644 src/webui/git_mirror_service.py create mode 100644 src/webui/plugin_progress_ws.py create mode 100644 src/webui/plugin_routes.py diff --git a/src/webui/git_mirror_service.py b/src/webui/git_mirror_service.py new file mode 100644 index 00000000..02645f70 --- /dev/null +++ b/src/webui/git_mirror_service.py @@ -0,0 +1,731 @@ +"""Git 镜像源服务 - 支持多镜像源、错误重试、Git 克隆和 Raw 文件获取""" +from typing import Optional, List, Dict, Any +from enum import Enum +import httpx +import json +import asyncio +import subprocess +import shutil +from pathlib import Path +from datetime import datetime +from src.common.logger import get_logger + +logger = get_logger("webui.git_mirror") + +# 导入进度更新函数(避免循环导入) +_update_progress = None + +def set_update_progress_callback(callback): + """设置进度更新回调函数""" + global _update_progress + _update_progress = callback + + +class MirrorType(str, Enum): + """镜像源类型""" + GH_PROXY = "gh-proxy" # gh-proxy 主节点 + HK_GH_PROXY = "hk-gh-proxy" # gh-proxy 香港节点 + CDN_GH_PROXY = "cdn-gh-proxy" # gh-proxy CDN 节点 + EDGEONE_GH_PROXY = "edgeone-gh-proxy" # gh-proxy EdgeOne 节点 + MEYZH_GITHUB = "meyzh-github" # Meyzh GitHub 镜像 + GITHUB = "github" # GitHub 官方源(兜底) + CUSTOM = "custom" # 自定义镜像源 + + +class GitMirrorConfig: + """Git 镜像源配置管理""" + + # 配置文件路径 + CONFIG_FILE = Path("data/webui.json") + + # 默认镜像源配置 + DEFAULT_MIRRORS = [ + { + "id": "gh-proxy", + "name": "gh-proxy 镜像", + "raw_prefix": "https://gh-proxy.org/https://raw.githubusercontent.com", + "clone_prefix": "https://gh-proxy.org/https://github.com", + "enabled": True, + "priority": 1, + "created_at": None + }, + { + "id": "hk-gh-proxy", + "name": "gh-proxy 香港节点", + "raw_prefix": "https://hk.gh-proxy.org/https://raw.githubusercontent.com", + "clone_prefix": "https://hk.gh-proxy.org/https://github.com", + "enabled": True, + "priority": 2, + "created_at": None + }, + { + "id": "cdn-gh-proxy", + "name": "gh-proxy CDN 节点", + "raw_prefix": "https://cdn.gh-proxy.org/https://raw.githubusercontent.com", + "clone_prefix": "https://cdn.gh-proxy.org/https://github.com", + "enabled": True, + "priority": 3, + "created_at": None + }, + { + "id": "edgeone-gh-proxy", + "name": "gh-proxy EdgeOne 节点", + "raw_prefix": "https://edgeone.gh-proxy.org/https://raw.githubusercontent.com", + "clone_prefix": "https://edgeone.gh-proxy.org/https://github.com", + "enabled": True, + "priority": 4, + "created_at": None + }, + { + "id": "meyzh-github", + "name": "Meyzh GitHub 镜像", + "raw_prefix": "https://meyzh.github.io/https://raw.githubusercontent.com", + "clone_prefix": "https://meyzh.github.io/https://github.com", + "enabled": True, + "priority": 5, + "created_at": None + }, + { + "id": "github", + "name": "GitHub 官方源(兜底)", + "raw_prefix": "https://raw.githubusercontent.com", + "clone_prefix": "https://github.com", + "enabled": True, + "priority": 999, + "created_at": None + } + ] + + def __init__(self): + """初始化配置管理器""" + self.config_file = self.CONFIG_FILE + self.mirrors: List[Dict[str, Any]] = [] + self._load_config() + + def _load_config(self) -> None: + """加载配置文件""" + try: + if self.config_file.exists(): + with open(self.config_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 检查是否有镜像源配置 + if "git_mirrors" not in data or not data["git_mirrors"]: + logger.info("配置文件中未找到镜像源配置,使用默认配置") + self._init_default_mirrors() + else: + self.mirrors = data["git_mirrors"] + logger.info(f"已加载 {len(self.mirrors)} 个镜像源配置") + else: + logger.info("配置文件不存在,创建默认配置") + self._init_default_mirrors() + except Exception as e: + logger.error(f"加载配置文件失败: {e}") + self._init_default_mirrors() + + def _init_default_mirrors(self) -> None: + """初始化默认镜像源""" + current_time = datetime.now().isoformat() + self.mirrors = [] + + for mirror in self.DEFAULT_MIRRORS: + mirror_copy = mirror.copy() + mirror_copy["created_at"] = current_time + self.mirrors.append(mirror_copy) + + self._save_config() + logger.info(f"已初始化 {len(self.mirrors)} 个默认镜像源") + + def _save_config(self) -> None: + """保存配置到文件""" + try: + # 确保目录存在 + self.config_file.parent.mkdir(parents=True, exist_ok=True) + + # 读取现有配置 + existing_data = {} + if self.config_file.exists(): + with open(self.config_file, 'r', encoding='utf-8') as f: + existing_data = json.load(f) + + # 更新镜像源配置 + existing_data["git_mirrors"] = self.mirrors + + # 写入文件 + with open(self.config_file, 'w', encoding='utf-8') as f: + json.dump(existing_data, f, indent=2, ensure_ascii=False) + + logger.debug(f"配置已保存到 {self.config_file}") + except Exception as e: + logger.error(f"保存配置文件失败: {e}") + + def get_all_mirrors(self) -> List[Dict[str, Any]]: + """获取所有镜像源""" + return self.mirrors.copy() + + def get_enabled_mirrors(self) -> List[Dict[str, Any]]: + """获取所有启用的镜像源,按优先级排序""" + enabled = [m for m in self.mirrors if m.get("enabled", False)] + return sorted(enabled, key=lambda x: x.get("priority", 999)) + + def get_mirror_by_id(self, mirror_id: str) -> Optional[Dict[str, Any]]: + """根据 ID 获取镜像源""" + for mirror in self.mirrors: + if mirror.get("id") == mirror_id: + return mirror.copy() + return None + + def add_mirror( + self, + mirror_id: str, + name: str, + raw_prefix: str, + clone_prefix: str, + enabled: bool = True, + priority: Optional[int] = None + ) -> Dict[str, Any]: + """ + 添加新的镜像源 + + Returns: + 添加的镜像源配置 + + Raises: + ValueError: 如果镜像源 ID 已存在 + """ + # 检查 ID 是否已存在 + if self.get_mirror_by_id(mirror_id): + raise ValueError(f"镜像源 ID 已存在: {mirror_id}") + + # 如果未指定优先级,使用最大优先级 + 1 + if priority is None: + max_priority = max((m.get("priority", 0) for m in self.mirrors), default=0) + priority = max_priority + 1 + + new_mirror = { + "id": mirror_id, + "name": name, + "raw_prefix": raw_prefix, + "clone_prefix": clone_prefix, + "enabled": enabled, + "priority": priority, + "created_at": datetime.now().isoformat() + } + + self.mirrors.append(new_mirror) + self._save_config() + + logger.info(f"已添加镜像源: {mirror_id} - {name}") + return new_mirror.copy() + + def update_mirror( + self, + mirror_id: str, + name: Optional[str] = None, + raw_prefix: Optional[str] = None, + clone_prefix: Optional[str] = None, + enabled: Optional[bool] = None, + priority: Optional[int] = None + ) -> Optional[Dict[str, Any]]: + """ + 更新镜像源配置 + + Returns: + 更新后的镜像源配置,如果不存在则返回 None + """ + for mirror in self.mirrors: + if mirror.get("id") == mirror_id: + if name is not None: + mirror["name"] = name + if raw_prefix is not None: + mirror["raw_prefix"] = raw_prefix + if clone_prefix is not None: + mirror["clone_prefix"] = clone_prefix + if enabled is not None: + mirror["enabled"] = enabled + if priority is not None: + mirror["priority"] = priority + + mirror["updated_at"] = datetime.now().isoformat() + self._save_config() + + logger.info(f"已更新镜像源: {mirror_id}") + return mirror.copy() + + return None + + def delete_mirror(self, mirror_id: str) -> bool: + """ + 删除镜像源 + + Returns: + True 如果删除成功,False 如果镜像源不存在 + """ + for i, mirror in enumerate(self.mirrors): + if mirror.get("id") == mirror_id: + self.mirrors.pop(i) + self._save_config() + logger.info(f"已删除镜像源: {mirror_id}") + return True + + return False + + def get_default_priority_list(self) -> List[str]: + """获取默认优先级列表(仅启用的镜像源 ID)""" + enabled = self.get_enabled_mirrors() + return [m["id"] for m in enabled] + + +class GitMirrorService: + """Git 镜像源服务""" + + def __init__( + self, + max_retries: int = 3, + timeout: int = 30, + config: Optional[GitMirrorConfig] = None + ): + """ + 初始化 Git 镜像源服务 + + Args: + max_retries: 最大重试次数 + timeout: 请求超时时间(秒) + config: 镜像源配置管理器(可选,默认创建新实例) + """ + self.max_retries = max_retries + self.timeout = timeout + self.config = config or GitMirrorConfig() + logger.info(f"Git镜像源服务初始化完成,已加载 {len(self.config.get_enabled_mirrors())} 个启用的镜像源") + + def get_mirror_config(self) -> GitMirrorConfig: + """获取镜像源配置管理器""" + return self.config + + @staticmethod + def check_git_installed() -> Dict[str, Any]: + """ + 检查本机是否安装了 Git + + Returns: + Dict 包含: + - installed: bool - 是否已安装 Git + - version: str - Git 版本号(如果已安装) + - path: str - Git 可执行文件路径(如果已安装) + - error: str - 错误信息(如果未安装或检测失败) + """ + import subprocess + import shutil + + try: + # 查找 git 可执行文件路径 + git_path = shutil.which("git") + + if not git_path: + logger.warning("未找到 Git 可执行文件") + return { + "installed": False, + "error": "系统中未找到 Git,请先安装 Git" + } + + # 获取 Git 版本 + result = subprocess.run( + ["git", "--version"], + capture_output=True, + text=True, + timeout=5 + ) + + if result.returncode == 0: + version = result.stdout.strip() + logger.info(f"检测到 Git: {version} at {git_path}") + return { + "installed": True, + "version": version, + "path": git_path + } + else: + logger.warning(f"Git 命令执行失败: {result.stderr}") + return { + "installed": False, + "error": f"Git 命令执行失败: {result.stderr}" + } + + except subprocess.TimeoutExpired: + logger.error("Git 版本检测超时") + return { + "installed": False, + "error": "Git 版本检测超时" + } + except Exception as e: + logger.error(f"检测 Git 时发生错误: {e}") + return { + "installed": False, + "error": f"检测 Git 时发生错误: {str(e)}" + } + + async def fetch_raw_file( + self, + owner: str, + repo: str, + branch: str, + file_path: str, + mirror_id: Optional[str] = None, + custom_url: Optional[str] = None + ) -> Dict[str, Any]: + """ + 获取 GitHub 仓库的 Raw 文件内容 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + branch: 分支名称 + file_path: 文件路径 + mirror_id: 指定的镜像源 ID + custom_url: 自定义完整 URL(如果提供,将忽略其他参数) + + Returns: + Dict 包含: + - success: bool - 是否成功 + - data: str - 文件内容(成功时) + - error: str - 错误信息(失败时) + - mirror_used: str - 使用的镜像源 + - attempts: int - 尝试次数 + """ + logger.info(f"开始获取 Raw 文件: {owner}/{repo}/{branch}/{file_path}") + + if custom_url: + # 使用自定义 URL + return await self._fetch_with_url(custom_url, "custom") + + # 确定要使用的镜像源列表 + if mirror_id: + # 使用指定的镜像源 + mirror = self.config.get_mirror_by_id(mirror_id) + if not mirror: + return { + "success": False, + "error": f"未找到镜像源: {mirror_id}", + "mirror_used": None, + "attempts": 0 + } + mirrors_to_try = [mirror] + else: + # 使用所有启用的镜像源 + mirrors_to_try = self.config.get_enabled_mirrors() + + total_mirrors = len(mirrors_to_try) + + # 依次尝试每个镜像源 + for index, mirror in enumerate(mirrors_to_try, 1): + # 推送进度:正在尝试第 N 个镜像源 + if _update_progress: + try: + progress = 30 + int((index - 1) / total_mirrors * 40) # 30% - 70% + await _update_progress( + stage="loading", + progress=progress, + message=f"正在尝试镜像源 {index}/{total_mirrors}: {mirror['name']}", + total_plugins=0, + loaded_plugins=0 + ) + except Exception as e: + logger.warning(f"推送进度失败: {e}") + + result = await self._fetch_raw_from_mirror( + owner, repo, branch, file_path, mirror + ) + + if result["success"]: + # 成功,推送进度 + if _update_progress: + try: + await _update_progress( + stage="loading", + progress=70, + message=f"成功从 {mirror['name']} 获取数据", + total_plugins=0, + loaded_plugins=0 + ) + except Exception as e: + logger.warning(f"推送进度失败: {e}") + return result + + # 失败,记录日志并推送失败信息 + logger.warning(f"镜像源 {mirror['id']} 失败: {result.get('error')}") + + if _update_progress and index < total_mirrors: + try: + await _update_progress( + stage="loading", + progress=30 + int(index / total_mirrors * 40), + message=f"镜像源 {mirror['name']} 失败,尝试下一个...", + total_plugins=0, + loaded_plugins=0 + ) + except Exception as e: + logger.warning(f"推送进度失败: {e}") + + # 所有镜像源都失败 + return { + "success": False, + "error": "所有镜像源均失败", + "mirror_used": None, + "attempts": len(mirrors_to_try) + } + + async def _fetch_raw_from_mirror( + self, + owner: str, + repo: str, + branch: str, + file_path: str, + mirror: Dict[str, Any] + ) -> Dict[str, Any]: + """从指定镜像源获取文件""" + # 构建 URL + raw_prefix = mirror["raw_prefix"] + url = f"{raw_prefix}/{owner}/{repo}/{branch}/{file_path}" + + return await self._fetch_with_url(url, mirror["id"]) + + async def _fetch_with_url(self, url: str, mirror_type: str) -> Dict[str, Any]: + """使用指定 URL 获取文件,支持重试""" + attempts = 0 + last_error = None + + for attempt in range(self.max_retries): + attempts += 1 + try: + logger.debug(f"尝试 #{attempt + 1}: {url}") + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get(url) + response.raise_for_status() + + logger.info(f"成功获取文件: {url}") + return { + "success": True, + "data": response.text, + "mirror_used": mirror_type, + "attempts": attempts, + "url": url + } + except httpx.HTTPStatusError as e: + last_error = f"HTTP {e.response.status_code}: {e}" + logger.warning(f"HTTP 错误 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + except httpx.TimeoutException as e: + last_error = f"请求超时: {e}" + logger.warning(f"超时 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + except Exception as e: + last_error = f"未知错误: {e}" + logger.error(f"错误 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + + return { + "success": False, + "error": last_error, + "mirror_used": mirror_type, + "attempts": attempts, + "url": url + } + + async def clone_repository( + self, + owner: str, + repo: str, + target_path: Path, + branch: Optional[str] = None, + mirror_id: Optional[str] = None, + custom_url: Optional[str] = None, + depth: Optional[int] = None + ) -> Dict[str, Any]: + """ + 克隆 GitHub 仓库 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + target_path: 目标路径 + branch: 分支名称(可选) + mirror_id: 指定的镜像源 ID + custom_url: 自定义克隆 URL + depth: 克隆深度(浅克隆) + + Returns: + Dict 包含: + - success: bool - 是否成功 + - path: str - 克隆路径(成功时) + - error: str - 错误信息(失败时) + - mirror_used: str - 使用的镜像源 + - attempts: int - 尝试次数 + """ + logger.info(f"开始克隆仓库: {owner}/{repo} 到 {target_path}") + + if custom_url: + # 使用自定义 URL + return await self._clone_with_url(custom_url, target_path, branch, depth, "custom") + + # 确定要使用的镜像源列表 + if mirror_id: + # 使用指定的镜像源 + mirror = self.config.get_mirror_by_id(mirror_id) + if not mirror: + return { + "success": False, + "error": f"未找到镜像源: {mirror_id}", + "mirror_used": None, + "attempts": 0 + } + mirrors_to_try = [mirror] + else: + # 使用所有启用的镜像源 + mirrors_to_try = self.config.get_enabled_mirrors() + + # 依次尝试每个镜像源 + for mirror in mirrors_to_try: + result = await self._clone_from_mirror( + owner, repo, target_path, branch, depth, mirror + ) + if result["success"]: + return result + logger.warning(f"镜像源 {mirror['id']} 克隆失败: {result.get('error')}") + + # 所有镜像源都失败 + return { + "success": False, + "error": "所有镜像源克隆均失败", + "mirror_used": None, + "attempts": len(mirrors_to_try) + } + + async def _clone_from_mirror( + self, + owner: str, + repo: str, + target_path: Path, + branch: Optional[str], + depth: Optional[int], + mirror: Dict[str, Any] + ) -> Dict[str, Any]: + """从指定镜像源克隆仓库""" + # 构建克隆 URL + clone_prefix = mirror["clone_prefix"] + url = f"{clone_prefix}/{owner}/{repo}.git" + + return await self._clone_with_url(url, target_path, branch, depth, mirror["id"]) + + async def _clone_with_url( + self, + url: str, + target_path: Path, + branch: Optional[str], + depth: Optional[int], + mirror_type: str + ) -> Dict[str, Any]: + """使用指定 URL 克隆仓库,支持重试""" + attempts = 0 + last_error = None + + for attempt in range(self.max_retries): + attempts += 1 + + try: + # 确保目标路径不存在 + if target_path.exists(): + logger.warning(f"目标路径已存在,删除: {target_path}") + shutil.rmtree(target_path, ignore_errors=True) + + # 构建 git clone 命令 + cmd = ["git", "clone"] + + # 添加分支参数 + if branch: + cmd.extend(["-b", branch]) + + # 添加深度参数(浅克隆) + if depth: + cmd.extend(["--depth", str(depth)]) + + # 添加 URL 和目标路径 + cmd.extend([url, str(target_path)]) + + logger.info(f"尝试克隆 #{attempt + 1}: {' '.join(cmd)}") + + # 推送进度 + if _update_progress: + try: + await _update_progress( + stage="loading", + progress=20 + attempt * 10, + message=f"正在克隆仓库 (尝试 {attempt + 1}/{self.max_retries})...", + operation="install" + ) + except Exception as e: + logger.warning(f"推送进度失败: {e}") + + # 执行 git clone(在线程池中运行以避免阻塞) + loop = asyncio.get_event_loop() + + def run_git_clone(): + return subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300 # 5分钟超时 + ) + + process = await loop.run_in_executor(None, run_git_clone) + + if process.returncode == 0: + logger.info(f"成功克隆仓库: {url} -> {target_path}") + return { + "success": True, + "path": str(target_path), + "mirror_used": mirror_type, + "attempts": attempts, + "url": url, + "branch": branch or "default" + } + else: + last_error = f"Git 克隆失败: {process.stderr}" + logger.warning(f"克隆失败 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + + except subprocess.TimeoutExpired: + last_error = "克隆超时(超过 5 分钟)" + logger.warning(f"克隆超时 (尝试 {attempt + 1}/{self.max_retries})") + + # 清理可能的部分克隆 + if target_path.exists(): + shutil.rmtree(target_path, ignore_errors=True) + + except FileNotFoundError: + last_error = "Git 未安装或不在 PATH 中" + logger.error(f"Git 未找到: {last_error}") + break # Git 不存在,不需要重试 + + except Exception as e: + last_error = f"未知错误: {e}" + logger.error(f"克隆错误 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + + # 清理可能的部分克隆 + if target_path.exists(): + shutil.rmtree(target_path, ignore_errors=True) + + return { + "success": False, + "error": last_error, + "mirror_used": mirror_type, + "attempts": attempts, + "url": url + } + + +# 全局服务实例 +_git_mirror_service: Optional[GitMirrorService] = None + + +def get_git_mirror_service() -> GitMirrorService: + """获取 Git 镜像源服务实例(单例)""" + global _git_mirror_service + if _git_mirror_service is None: + _git_mirror_service = GitMirrorService() + return _git_mirror_service diff --git a/src/webui/plugin_progress_ws.py b/src/webui/plugin_progress_ws.py new file mode 100644 index 00000000..927dbb13 --- /dev/null +++ b/src/webui/plugin_progress_ws.py @@ -0,0 +1,127 @@ +"""WebSocket 插件加载进度推送模块""" +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from typing import Set, Dict, Any +import json +import asyncio +from src.common.logger import get_logger + +logger = get_logger("webui.plugin_progress") + +# 创建路由器 +router = APIRouter() + +# 全局 WebSocket 连接池 +active_connections: Set[WebSocket] = set() + +# 当前加载进度状态 +current_progress: Dict[str, Any] = { + "operation": "idle", # idle, fetch, install, uninstall, update + "stage": "idle", # idle, loading, success, error + "progress": 0, # 0-100 + "message": "", + "error": None, + "plugin_id": None, # 当前操作的插件 ID + "total_plugins": 0, + "loaded_plugins": 0 +} + + +async def broadcast_progress(progress_data: Dict[str, Any]): + """广播进度更新到所有连接的客户端""" + global current_progress + current_progress = progress_data.copy() + + if not active_connections: + return + + message = json.dumps(progress_data, ensure_ascii=False) + disconnected = set() + + for websocket in active_connections: + try: + await websocket.send_text(message) + except Exception as e: + logger.error(f"发送进度更新失败: {e}") + disconnected.add(websocket) + + # 移除断开的连接 + for websocket in disconnected: + active_connections.discard(websocket) + + +async def update_progress( + stage: str, + progress: int, + message: str, + operation: str = "fetch", + error: str = None, + plugin_id: str = None, + total_plugins: int = 0, + loaded_plugins: int = 0 +): + """更新并广播进度 + + Args: + stage: 阶段 (idle, loading, success, error) + progress: 进度百分比 (0-100) + message: 当前消息 + operation: 操作类型 (fetch, install, uninstall, update) + error: 错误信息(可选) + plugin_id: 当前操作的插件 ID + total_plugins: 总插件数 + loaded_plugins: 已加载插件数 + """ + progress_data = { + "operation": operation, + "stage": stage, + "progress": progress, + "message": message, + "error": error, + "plugin_id": plugin_id, + "total_plugins": total_plugins, + "loaded_plugins": loaded_plugins, + "timestamp": asyncio.get_event_loop().time() + } + + await broadcast_progress(progress_data) + logger.debug(f"进度更新: [{operation}] {stage} - {progress}% - {message}") + + +@router.websocket("/ws/plugin-progress") +async def websocket_plugin_progress(websocket: WebSocket): + """WebSocket 插件加载进度推送端点 + + 客户端连接后会立即收到当前进度状态 + """ + await websocket.accept() + active_connections.add(websocket) + logger.info(f"📡 插件进度 WebSocket 客户端已连接,当前连接数: {len(active_connections)}") + + try: + # 发送当前进度状态 + await websocket.send_text(json.dumps(current_progress, ensure_ascii=False)) + + # 保持连接并处理客户端消息 + while True: + try: + data = await websocket.receive_text() + + # 处理客户端心跳 + if data == "ping": + await websocket.send_text("pong") + + except Exception as e: + logger.error(f"处理客户端消息时出错: {e}") + break + + except WebSocketDisconnect: + active_connections.discard(websocket) + logger.info(f"📡 插件进度 WebSocket 客户端已断开,当前连接数: {len(active_connections)}") + except Exception as e: + logger.error(f"❌ WebSocket 错误: {e}") + active_connections.discard(websocket) + + +def get_progress_router() -> APIRouter: + """获取插件进度 WebSocket 路由器""" + return router diff --git a/src/webui/plugin_routes.py b/src/webui/plugin_routes.py new file mode 100644 index 00000000..5054a391 --- /dev/null +++ b/src/webui/plugin_routes.py @@ -0,0 +1,1145 @@ +from fastapi import APIRouter, HTTPException, Header +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from pathlib import Path +import json +from src.common.logger import get_logger +from src.config.config import MMC_VERSION +from .git_mirror_service import get_git_mirror_service, set_update_progress_callback +from .token_manager import get_token_manager +from .plugin_progress_ws import update_progress + +logger = get_logger("webui.plugin_routes") + +# 创建路由器 +router = APIRouter(prefix="/plugins", tags=["插件管理"]) + +# 设置进度更新回调 +set_update_progress_callback(update_progress) + + +def parse_version(version_str: str) -> tuple[int, int, int]: + """ + 解析版本号字符串 + + 支持格式: + - 0.11.2 -> (0, 11, 2) + - 0.11.2.snapshot.2 -> (0, 11, 2) + + Returns: + (major, minor, patch) 三元组 + """ + # 移除 snapshot 等后缀 + base_version = version_str.split('.snapshot')[0].split('.dev')[0].split('.alpha')[0].split('.beta')[0] + + parts = base_version.split('.') + if len(parts) < 3: + # 补齐到 3 位 + parts.extend(['0'] * (3 - len(parts))) + + try: + major = int(parts[0]) + minor = int(parts[1]) + patch = int(parts[2]) + return (major, minor, patch) + except (ValueError, IndexError): + logger.warning(f"无法解析版本号: {version_str},返回默认值 (0, 0, 0)") + return (0, 0, 0) + + +# ============ 请求/响应模型 ============ + +class FetchRawFileRequest(BaseModel): + """获取 Raw 文件请求""" + owner: str = Field(..., description="仓库所有者", example="MaiM-with-u") + repo: str = Field(..., description="仓库名称", example="plugin-repo") + branch: str = Field(..., description="分支名称", example="main") + file_path: str = Field(..., description="文件路径", example="plugin_details.json") + mirror_id: Optional[str] = Field(None, description="指定镜像源 ID") + custom_url: Optional[str] = Field(None, description="自定义完整 URL") + + +class FetchRawFileResponse(BaseModel): + """获取 Raw 文件响应""" + success: bool = Field(..., description="是否成功") + data: Optional[str] = Field(None, description="文件内容") + error: Optional[str] = Field(None, description="错误信息") + mirror_used: Optional[str] = Field(None, description="使用的镜像源") + attempts: int = Field(..., description="尝试次数") + url: Optional[str] = Field(None, description="实际请求的 URL") + + +class CloneRepositoryRequest(BaseModel): + """克隆仓库请求""" + owner: str = Field(..., description="仓库所有者", example="MaiM-with-u") + repo: str = Field(..., description="仓库名称", example="plugin-repo") + target_path: str = Field(..., description="目标路径(相对于插件目录)") + branch: Optional[str] = Field(None, description="分支名称", example="main") + mirror_id: Optional[str] = Field(None, description="指定镜像源 ID") + custom_url: Optional[str] = Field(None, description="自定义克隆 URL") + depth: Optional[int] = Field(None, description="克隆深度(浅克隆)", ge=1) + + +class CloneRepositoryResponse(BaseModel): + """克隆仓库响应""" + success: bool = Field(..., description="是否成功") + path: Optional[str] = Field(None, description="克隆路径") + error: Optional[str] = Field(None, description="错误信息") + mirror_used: Optional[str] = Field(None, description="使用的镜像源") + attempts: int = Field(..., description="尝试次数") + url: Optional[str] = Field(None, description="实际克隆的 URL") + message: Optional[str] = Field(None, description="附加信息") + + +class MirrorConfigResponse(BaseModel): + """镜像源配置响应""" + id: str = Field(..., description="镜像源 ID") + name: str = Field(..., description="镜像源名称") + raw_prefix: str = Field(..., description="Raw 文件前缀") + clone_prefix: str = Field(..., description="克隆前缀") + enabled: bool = Field(..., description="是否启用") + priority: int = Field(..., description="优先级(数字越小优先级越高)") + + +class AvailableMirrorsResponse(BaseModel): + """可用镜像源列表响应""" + mirrors: List[MirrorConfigResponse] = Field(..., description="镜像源列表") + default_priority: List[str] = Field(..., description="默认优先级顺序(ID 列表)") + + +class AddMirrorRequest(BaseModel): + """添加镜像源请求""" + id: str = Field(..., description="镜像源 ID", example="custom-mirror") + name: str = Field(..., description="镜像源名称", example="自定义镜像源") + raw_prefix: str = Field(..., description="Raw 文件前缀", example="https://example.com/raw") + clone_prefix: str = Field(..., description="克隆前缀", example="https://example.com/clone") + enabled: bool = Field(True, description="是否启用") + priority: Optional[int] = Field(None, description="优先级") + + +class UpdateMirrorRequest(BaseModel): + """更新镜像源请求""" + name: Optional[str] = Field(None, description="镜像源名称") + raw_prefix: Optional[str] = Field(None, description="Raw 文件前缀") + clone_prefix: Optional[str] = Field(None, description="克隆前缀") + enabled: Optional[bool] = Field(None, description="是否启用") + priority: Optional[int] = Field(None, description="优先级") + + +class GitStatusResponse(BaseModel): + """Git 安装状态响应""" + installed: bool = Field(..., description="是否已安装 Git") + version: Optional[str] = Field(None, description="Git 版本号") + path: Optional[str] = Field(None, description="Git 可执行文件路径") + error: Optional[str] = Field(None, description="错误信息") + + +class InstallPluginRequest(BaseModel): + """安装插件请求""" + plugin_id: str = Field(..., description="插件 ID") + repository_url: str = Field(..., description="插件仓库 URL") + branch: Optional[str] = Field("main", description="分支名称") + mirror_id: Optional[str] = Field(None, description="指定镜像源 ID") + + +class VersionResponse(BaseModel): + """麦麦版本响应""" + version: str = Field(..., description="麦麦版本号") + version_major: int = Field(..., description="主版本号") + version_minor: int = Field(..., description="次版本号") + version_patch: int = Field(..., description="补丁版本号") + + +class UninstallPluginRequest(BaseModel): + """卸载插件请求""" + plugin_id: str = Field(..., description="插件 ID") + + +class UpdatePluginRequest(BaseModel): + """更新插件请求""" + plugin_id: str = Field(..., description="插件 ID") + repository_url: str = Field(..., description="插件仓库 URL") + branch: Optional[str] = Field("main", description="分支名称") + mirror_id: Optional[str] = Field(None, description="指定镜像源 ID") + + +# ============ API 路由 ============ + +@router.get("/version", response_model=VersionResponse) +async def get_maimai_version() -> VersionResponse: + """ + 获取麦麦版本信息 + + 此接口无需认证,用于前端检查插件兼容性 + """ + major, minor, patch = parse_version(MMC_VERSION) + + return VersionResponse( + version=MMC_VERSION, + version_major=major, + version_minor=minor, + version_patch=patch + ) + + +@router.get("/git-status", response_model=GitStatusResponse) +async def check_git_status() -> GitStatusResponse: + """ + 检查本机 Git 安装状态 + + 此接口无需认证,用于前端快速检测是否可以使用插件安装功能 + """ + service = get_git_mirror_service() + result = service.check_git_installed() + + return GitStatusResponse(**result) + + +@router.get("/mirrors", response_model=AvailableMirrorsResponse) +async def get_available_mirrors( + authorization: Optional[str] = Header(None) +) -> AvailableMirrorsResponse: + """ + 获取所有可用的镜像源配置 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + service = get_git_mirror_service() + config = service.get_mirror_config() + + all_mirrors = config.get_all_mirrors() + mirrors = [ + MirrorConfigResponse( + id=m["id"], + name=m["name"], + raw_prefix=m["raw_prefix"], + clone_prefix=m["clone_prefix"], + enabled=m["enabled"], + priority=m["priority"] + ) + for m in all_mirrors + ] + + return AvailableMirrorsResponse( + mirrors=mirrors, + default_priority=config.get_default_priority_list() + ) + + +@router.post("/mirrors", response_model=MirrorConfigResponse) +async def add_mirror( + request: AddMirrorRequest, + authorization: Optional[str] = Header(None) +) -> MirrorConfigResponse: + """ + 添加新的镜像源 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + try: + service = get_git_mirror_service() + config = service.get_mirror_config() + + mirror = config.add_mirror( + mirror_id=request.id, + name=request.name, + raw_prefix=request.raw_prefix, + clone_prefix=request.clone_prefix, + enabled=request.enabled, + priority=request.priority + ) + + return MirrorConfigResponse( + id=mirror["id"], + name=mirror["name"], + raw_prefix=mirror["raw_prefix"], + clone_prefix=mirror["clone_prefix"], + enabled=mirror["enabled"], + priority=mirror["priority"] + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) from e + except Exception as e: + logger.error(f"添加镜像源失败: {e}") + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.put("/mirrors/{mirror_id}", response_model=MirrorConfigResponse) +async def update_mirror( + mirror_id: str, + request: UpdateMirrorRequest, + authorization: Optional[str] = Header(None) +) -> MirrorConfigResponse: + """ + 更新镜像源配置 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + try: + service = get_git_mirror_service() + config = service.get_mirror_config() + + mirror = config.update_mirror( + mirror_id=mirror_id, + name=request.name, + raw_prefix=request.raw_prefix, + clone_prefix=request.clone_prefix, + enabled=request.enabled, + priority=request.priority + ) + + if not mirror: + raise HTTPException(status_code=404, detail=f"未找到镜像源: {mirror_id}") + + return MirrorConfigResponse( + id=mirror["id"], + name=mirror["name"], + raw_prefix=mirror["raw_prefix"], + clone_prefix=mirror["clone_prefix"], + enabled=mirror["enabled"], + priority=mirror["priority"] + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"更新镜像源失败: {e}") + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.delete("/mirrors/{mirror_id}") +async def delete_mirror( + mirror_id: str, + authorization: Optional[str] = Header(None) +) -> Dict[str, Any]: + """ + 删除镜像源 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + service = get_git_mirror_service() + config = service.get_mirror_config() + + success = config.delete_mirror(mirror_id) + + if not success: + raise HTTPException(status_code=404, detail=f"未找到镜像源: {mirror_id}") + + return { + "success": True, + "message": f"已删除镜像源: {mirror_id}" + } + + +@router.post("/fetch-raw", response_model=FetchRawFileResponse) +async def fetch_raw_file( + request: FetchRawFileRequest, + authorization: Optional[str] = Header(None) +) -> FetchRawFileResponse: + """ + 获取 GitHub 仓库的 Raw 文件内容 + + 支持多镜像源自动切换和错误重试 + + 注意:此接口可公开访问,用于获取插件仓库等公开资源 + """ + # Token 验证(可选,用于日志记录) + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + is_authenticated = token and token_manager.verify_token(token) + + # 对于公开仓库的访问,不强制要求认证 + # 只在日志中记录是否认证 + logger.info( + f"收到获取 Raw 文件请求 (认证: {is_authenticated}): " + f"{request.owner}/{request.repo}/{request.branch}/{request.file_path}" + ) + + # 发送开始加载进度 + await update_progress( + stage="loading", + progress=10, + message=f"正在获取插件列表: {request.file_path}", + total_plugins=0, + loaded_plugins=0 + ) + + try: + service = get_git_mirror_service() + + # git_mirror_service 会自动推送 30%-70% 的详细镜像源尝试进度 + result = await service.fetch_raw_file( + owner=request.owner, + repo=request.repo, + branch=request.branch, + file_path=request.file_path, + mirror_id=request.mirror_id, + custom_url=request.custom_url + ) + + if result.get("success"): + # 更新进度:成功获取 + await update_progress( + stage="loading", + progress=70, + message="正在解析插件数据...", + total_plugins=0, + loaded_plugins=0 + ) + + # 尝试解析插件数量 + try: + import json + data = json.loads(result.get("data", "[]")) + total = len(data) if isinstance(data, list) else 0 + + # 发送成功状态 + await update_progress( + stage="success", + progress=100, + message=f"成功加载 {total} 个插件", + total_plugins=total, + loaded_plugins=total + ) + except Exception: + # 如果解析失败,仍然发送成功状态 + await update_progress( + stage="success", + progress=100, + message="加载完成", + total_plugins=0, + loaded_plugins=0 + ) + + return FetchRawFileResponse(**result) + + except Exception as e: + logger.error(f"获取 Raw 文件失败: {e}") + + # 发送错误进度 + await update_progress( + stage="error", + progress=0, + message="加载失败", + error=str(e), + total_plugins=0, + loaded_plugins=0 + ) + + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.post("/clone", response_model=CloneRepositoryResponse) +async def clone_repository( + request: CloneRepositoryRequest, + authorization: Optional[str] = Header(None) +) -> CloneRepositoryResponse: + """ + 克隆 GitHub 仓库到本地 + + 支持多镜像源自动切换和错误重试 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + logger.info( + f"收到克隆仓库请求: {request.owner}/{request.repo} -> {request.target_path}" + ) + + try: + # TODO: 验证 target_path 的安全性,防止路径遍历攻击 + # TODO: 确定实际的插件目录基路径 + base_plugin_path = Path("./plugins") # 临时路径 + target_path = base_plugin_path / request.target_path + + service = get_git_mirror_service() + result = await service.clone_repository( + owner=request.owner, + repo=request.repo, + target_path=target_path, + branch=request.branch, + mirror_id=request.mirror_id, + custom_url=request.custom_url, + depth=request.depth + ) + + return CloneRepositoryResponse(**result) + + except Exception as e: + logger.error(f"克隆仓库失败: {e}") + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.post("/install") +async def install_plugin( + request: InstallPluginRequest, + authorization: Optional[str] = Header(None) +) -> Dict[str, Any]: + """ + 安装插件 + + 从 Git 仓库克隆插件到本地插件目录 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + logger.info(f"收到安装插件请求: {request.plugin_id}") + + try: + # 推送进度:开始安装 + await update_progress( + stage="loading", + progress=5, + message=f"开始安装插件: {request.plugin_id}", + operation="install", + plugin_id=request.plugin_id + ) + + # 1. 解析仓库 URL + # repository_url 格式: https://github.com/owner/repo + repo_url = request.repository_url.rstrip('/') + if repo_url.endswith('.git'): + repo_url = repo_url[:-4] + + parts = repo_url.split('/') + if len(parts) < 2: + raise HTTPException(status_code=400, detail="无效的仓库 URL") + + owner = parts[-2] + repo = parts[-1] + + await update_progress( + stage="loading", + progress=10, + message=f"解析仓库信息: {owner}/{repo}", + operation="install", + plugin_id=request.plugin_id + ) + + # 2. 确定插件安装路径 + plugins_dir = Path("plugins") + plugins_dir.mkdir(exist_ok=True) + + target_path = plugins_dir / request.plugin_id + + # 检查插件是否已安装 + if target_path.exists(): + await update_progress( + stage="error", + progress=0, + message=f"插件已存在", + operation="install", + plugin_id=request.plugin_id, + error="插件已安装,请先卸载" + ) + raise HTTPException(status_code=400, detail="插件已安装") + + await update_progress( + stage="loading", + progress=15, + message=f"准备克隆到: {target_path}", + operation="install", + plugin_id=request.plugin_id + ) + + # 3. 克隆仓库(这里会自动推送 20%-80% 的进度) + service = get_git_mirror_service() + + # 如果是 GitHub 仓库,使用镜像源 + if 'github.com' in repo_url: + result = await service.clone_repository( + owner=owner, + repo=repo, + target_path=target_path, + branch=request.branch, + mirror_id=request.mirror_id, + depth=1 # 浅克隆,节省时间和空间 + ) + else: + # 自定义仓库,直接使用 URL + result = await service.clone_repository( + owner=owner, + repo=repo, + target_path=target_path, + branch=request.branch, + custom_url=repo_url, + depth=1 + ) + + if not result.get("success"): + error_msg = result.get("error", "克隆失败") + await update_progress( + stage="error", + progress=0, + message="克隆仓库失败", + operation="install", + plugin_id=request.plugin_id, + error=error_msg + ) + raise HTTPException(status_code=500, detail=error_msg) + + # 4. 验证插件完整性 + await update_progress( + stage="loading", + progress=85, + message="验证插件文件...", + operation="install", + plugin_id=request.plugin_id + ) + + manifest_path = target_path / "_manifest.json" + if not manifest_path.exists(): + # 清理失败的安装 + import shutil + shutil.rmtree(target_path, ignore_errors=True) + + await update_progress( + stage="error", + progress=0, + message="插件缺少 _manifest.json", + operation="install", + plugin_id=request.plugin_id, + error="无效的插件格式" + ) + raise HTTPException(status_code=400, detail="无效的插件:缺少 _manifest.json") + + # 5. 读取并验证 manifest + await update_progress( + stage="loading", + progress=90, + message="读取插件配置...", + operation="install", + plugin_id=request.plugin_id + ) + + try: + import json as json_module + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest = json_module.load(f) + + # 基本验证 + required_fields = ['manifest_version', 'name', 'version', 'author'] + for field in required_fields: + if field not in manifest: + raise ValueError(f"缺少必需字段: {field}") + + except Exception as e: + # 清理失败的安装 + import shutil + shutil.rmtree(target_path, ignore_errors=True) + + await update_progress( + stage="error", + progress=0, + message="_manifest.json 无效", + operation="install", + plugin_id=request.plugin_id, + error=str(e) + ) + raise HTTPException(status_code=400, detail=f"无效的 _manifest.json: {e}") from e + + # 6. 安装成功 + await update_progress( + stage="success", + progress=100, + message=f"成功安装插件: {manifest['name']} v{manifest['version']}", + operation="install", + plugin_id=request.plugin_id + ) + + return { + "success": True, + "message": "插件安装成功", + "plugin_id": request.plugin_id, + "plugin_name": manifest['name'], + "version": manifest['version'], + "path": str(target_path) + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"安装插件失败: {e}", exc_info=True) + + await update_progress( + stage="error", + progress=0, + message="安装失败", + operation="install", + plugin_id=request.plugin_id, + error=str(e) + ) + + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.post("/uninstall") +async def uninstall_plugin( + request: UninstallPluginRequest, + authorization: Optional[str] = Header(None) +) -> Dict[str, Any]: + """ + 卸载插件 + + 删除插件目录及其所有文件 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + logger.info(f"收到卸载插件请求: {request.plugin_id}") + + try: + # 推送进度:开始卸载 + await update_progress( + stage="loading", + progress=10, + message=f"开始卸载插件: {request.plugin_id}", + operation="uninstall", + plugin_id=request.plugin_id + ) + + # 1. 检查插件是否存在 + plugins_dir = Path("plugins") + plugin_path = plugins_dir / request.plugin_id + + if not plugin_path.exists(): + await update_progress( + stage="error", + progress=0, + message="插件不存在", + operation="uninstall", + plugin_id=request.plugin_id, + error="插件未安装或已被删除" + ) + raise HTTPException(status_code=404, detail="插件未安装") + + await update_progress( + stage="loading", + progress=30, + message=f"正在删除插件文件: {plugin_path}", + operation="uninstall", + plugin_id=request.plugin_id + ) + + # 2. 读取插件信息(用于日志) + manifest_path = plugin_path / "_manifest.json" + plugin_name = request.plugin_id + + if manifest_path.exists(): + try: + import json as json_module + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest = json_module.load(f) + plugin_name = manifest.get("name", request.plugin_id) + except Exception: + pass # 如果读取失败,使用插件 ID 作为名称 + + await update_progress( + stage="loading", + progress=50, + message=f"正在删除 {plugin_name}...", + operation="uninstall", + plugin_id=request.plugin_id + ) + + # 3. 删除插件目录 + import shutil + import stat + + def remove_readonly(func, path, _): + """清除只读属性并删除文件""" + import os + os.chmod(path, stat.S_IWRITE) + func(path) + + shutil.rmtree(plugin_path, onerror=remove_readonly) + + logger.info(f"成功卸载插件: {request.plugin_id} ({plugin_name})") + + # 4. 推送成功状态 + await update_progress( + stage="success", + progress=100, + message=f"成功卸载插件: {plugin_name}", + operation="uninstall", + plugin_id=request.plugin_id + ) + + return { + "success": True, + "message": "插件卸载成功", + "plugin_id": request.plugin_id, + "plugin_name": plugin_name + } + + except HTTPException: + raise + except PermissionError as e: + logger.error(f"卸载插件失败(权限错误): {e}") + + await update_progress( + stage="error", + progress=0, + message="卸载失败", + operation="uninstall", + plugin_id=request.plugin_id, + error="权限不足,无法删除插件文件" + ) + + raise HTTPException(status_code=500, detail="权限不足,无法删除插件文件") from e + except Exception as e: + logger.error(f"卸载插件失败: {e}", exc_info=True) + + await update_progress( + stage="error", + progress=0, + message="卸载失败", + operation="uninstall", + plugin_id=request.plugin_id, + error=str(e) + ) + + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.post("/update") +async def update_plugin( + request: UpdatePluginRequest, + authorization: Optional[str] = Header(None) +) -> Dict[str, Any]: + """ + 更新插件 + + 删除旧版本,重新克隆新版本 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + logger.info(f"收到更新插件请求: {request.plugin_id}") + + try: + # 推送进度:开始更新 + await update_progress( + stage="loading", + progress=5, + message=f"开始更新插件: {request.plugin_id}", + operation="update", + plugin_id=request.plugin_id + ) + + # 1. 检查插件是否已安装 + plugins_dir = Path("plugins") + plugin_path = plugins_dir / request.plugin_id + + if not plugin_path.exists(): + await update_progress( + stage="error", + progress=0, + message="插件不存在", + operation="update", + plugin_id=request.plugin_id, + error="插件未安装,请先安装" + ) + raise HTTPException(status_code=404, detail="插件未安装") + + # 2. 读取旧版本信息 + manifest_path = plugin_path / "_manifest.json" + old_version = "unknown" + plugin_name = request.plugin_id + + if manifest_path.exists(): + try: + import json as json_module + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest = json_module.load(f) + old_version = manifest.get("version", "unknown") + plugin_name = manifest.get("name", request.plugin_id) + except Exception: + pass + + await update_progress( + stage="loading", + progress=10, + message=f"当前版本: {old_version},准备更新...", + operation="update", + plugin_id=request.plugin_id + ) + + # 3. 删除旧版本 + await update_progress( + stage="loading", + progress=20, + message="正在删除旧版本...", + operation="update", + plugin_id=request.plugin_id + ) + + import shutil + import stat + + def remove_readonly(func, path, _): + """清除只读属性并删除文件""" + import os + os.chmod(path, stat.S_IWRITE) + func(path) + + shutil.rmtree(plugin_path, onerror=remove_readonly) + + logger.info(f"已删除旧版本: {request.plugin_id} v{old_version}") + + # 4. 解析仓库 URL + await update_progress( + stage="loading", + progress=30, + message="正在准备下载新版本...", + operation="update", + plugin_id=request.plugin_id + ) + + repo_url = request.repository_url.rstrip('/') + if repo_url.endswith('.git'): + repo_url = repo_url[:-4] + + parts = repo_url.split('/') + if len(parts) < 2: + raise HTTPException(status_code=400, detail="无效的仓库 URL") + + owner = parts[-2] + repo = parts[-1] + + # 5. 克隆新版本(这里会推送 35%-85% 的进度) + service = get_git_mirror_service() + + if 'github.com' in repo_url: + result = await service.clone_repository( + owner=owner, + repo=repo, + target_path=plugin_path, + branch=request.branch, + mirror_id=request.mirror_id, + depth=1 + ) + else: + result = await service.clone_repository( + owner=owner, + repo=repo, + target_path=plugin_path, + branch=request.branch, + custom_url=repo_url, + depth=1 + ) + + if not result.get("success"): + error_msg = result.get("error", "克隆失败") + await update_progress( + stage="error", + progress=0, + message="下载新版本失败", + operation="update", + plugin_id=request.plugin_id, + error=error_msg + ) + raise HTTPException(status_code=500, detail=error_msg) + + # 6. 验证新版本 + await update_progress( + stage="loading", + progress=90, + message="验证新版本...", + operation="update", + plugin_id=request.plugin_id + ) + + new_manifest_path = plugin_path / "_manifest.json" + if not new_manifest_path.exists(): + # 清理失败的更新 + def remove_readonly(func, path, _): + """清除只读属性并删除文件""" + import os + os.chmod(path, stat.S_IWRITE) + func(path) + + shutil.rmtree(plugin_path, onerror=remove_readonly) + + await update_progress( + stage="error", + progress=0, + message="新版本缺少 _manifest.json", + operation="update", + plugin_id=request.plugin_id, + error="无效的插件格式" + ) + raise HTTPException(status_code=400, detail="无效的插件:缺少 _manifest.json") + + # 7. 读取新版本信息 + try: + with open(new_manifest_path, 'r', encoding='utf-8') as f: + new_manifest = json_module.load(f) + + new_version = new_manifest.get("version", "unknown") + new_name = new_manifest.get("name", request.plugin_id) + + logger.info(f"成功更新插件: {request.plugin_id} {old_version} → {new_version}") + + # 8. 推送成功状态 + await update_progress( + stage="success", + progress=100, + message=f"成功更新 {new_name}: {old_version} → {new_version}", + operation="update", + plugin_id=request.plugin_id + ) + + return { + "success": True, + "message": "插件更新成功", + "plugin_id": request.plugin_id, + "plugin_name": new_name, + "old_version": old_version, + "new_version": new_version + } + + except Exception as e: + # 清理失败的更新 + shutil.rmtree(plugin_path, ignore_errors=True) + + await update_progress( + stage="error", + progress=0, + message="_manifest.json 无效", + operation="update", + plugin_id=request.plugin_id, + error=str(e) + ) + raise HTTPException(status_code=400, detail=f"无效的 _manifest.json: {e}") from e + + except HTTPException: + raise + except Exception as e: + logger.error(f"更新插件失败: {e}", exc_info=True) + + await update_progress( + stage="error", + progress=0, + message="更新失败", + operation="update", + plugin_id=request.plugin_id, + error=str(e) + ) + + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e + + +@router.get("/installed") +async def get_installed_plugins( + authorization: Optional[str] = Header(None) +) -> Dict[str, Any]: + """ + 获取已安装的插件列表 + + 扫描 plugins 目录,返回所有已安装插件的 ID 和基本信息 + """ + # Token 验证 + token = authorization.replace("Bearer ", "") if authorization else None + token_manager = get_token_manager() + if not token or not token_manager.verify_token(token): + raise HTTPException(status_code=401, detail="未授权:无效的访问令牌") + + logger.info("收到获取已安装插件列表请求") + + try: + plugins_dir = Path("plugins") + + # 如果插件目录不存在,返回空列表 + if not plugins_dir.exists(): + logger.info("插件目录不存在,创建目录") + plugins_dir.mkdir(exist_ok=True) + return { + "success": True, + "plugins": [] + } + + installed_plugins = [] + + # 遍历插件目录 + for plugin_path in plugins_dir.iterdir(): + # 只处理目录 + if not plugin_path.is_dir(): + continue + + # 目录名即为插件 ID + plugin_id = plugin_path.name + + # 跳过隐藏目录和特殊目录 + if plugin_id.startswith('.') or plugin_id.startswith('__'): + continue + + # 读取 _manifest.json + manifest_path = plugin_path / "_manifest.json" + + if not manifest_path.exists(): + logger.warning(f"插件 {plugin_id} 缺少 _manifest.json,跳过") + continue + + try: + import json as json_module + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest = json_module.load(f) + + # 基本验证 + if 'name' not in manifest or 'version' not in manifest: + logger.warning(f"插件 {plugin_id} 的 _manifest.json 格式无效,跳过") + continue + + # 添加到已安装列表(返回完整的 manifest 信息) + installed_plugins.append({ + "id": plugin_id, + "manifest": manifest, # 返回完整的 manifest 对象 + "path": str(plugin_path.absolute()) + }) + + except json.JSONDecodeError as e: + logger.warning(f"插件 {plugin_id} 的 _manifest.json 解析失败: {e}") + continue + except Exception as e: + logger.error(f"读取插件 {plugin_id} 信息时出错: {e}") + continue + + logger.info(f"找到 {len(installed_plugins)} 个已安装插件") + + return { + "success": True, + "plugins": installed_plugins, + "total": len(installed_plugins) + } + + except Exception as e: + logger.error(f"获取已安装插件列表失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") from e diff --git a/src/webui/routes.py b/src/webui/routes.py index 3b49262d..a3397e83 100644 --- a/src/webui/routes.py +++ b/src/webui/routes.py @@ -9,6 +9,8 @@ from .statistics_routes import router as statistics_router from .person_routes import router as person_router from .expression_routes import router as expression_router from .emoji_routes import router as emoji_router +from .plugin_routes import router as plugin_router +from .plugin_progress_ws import get_progress_router logger = get_logger("webui.api") @@ -25,6 +27,10 @@ router.include_router(person_router) router.include_router(expression_router) # 注册表情包管理路由 router.include_router(emoji_router) +# 注册插件管理路由 +router.include_router(plugin_router) +# 注册插件进度 WebSocket 路由 +router.include_router(get_progress_router()) class TokenVerifyRequest(BaseModel):