From a7c108aa5de9a6c8f19e8e5e6fe12e5d176aba8d Mon Sep 17 00:00:00 2001 From: zhiyucn Date: Sun, 7 Dec 2025 03:55:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=99=BA=E8=83=BD?= =?UTF-8?q?=E5=88=86=E6=94=AF=E9=80=89=E6=8B=A9=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在WebUI的插件安装中实现远程仓库分支自动检测功能,当main分支不存在时自动选择main或master分支。添加_get_available_branches方法获取远程分支列表,并在克隆失败时尝试智能切换分支。优化克隆过程中的分支选择和错误处理逻辑。 --- src/webui/git_mirror_service.py | 128 ++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 6 deletions(-) diff --git a/src/webui/git_mirror_service.py b/src/webui/git_mirror_service.py index a6a9b1bc..50bffefb 100644 --- a/src/webui/git_mirror_service.py +++ b/src/webui/git_mirror_service.py @@ -555,12 +555,94 @@ class GitMirrorService: return await self._clone_with_url(url, target_path, branch, depth, mirror["id"]) + async def _get_available_branches(self, url: str) -> List[str]: + """获取远程仓库的可用分支列表""" + try: + logger.info(f"正在获取远程分支列表: {url}") + + # 使用 git ls-remote 获取分支列表 + cmd = ["git", "ls-remote", "--heads", url] + + loop = asyncio.get_event_loop() + + def run_git_ls_remote(): + return subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=60, # 1分钟超时 + ) + + process = await loop.run_in_executor(None, run_git_ls_remote) + + if process.returncode == 0: + branches = [] + for line in process.stdout.strip().split('\n'): + if line: + # 解析分支引用,格式: "\trefs/heads/" + parts = line.split('\t') + if len(parts) == 2 and parts[1].startswith('refs/heads/'): + branch_name = parts[1].replace('refs/heads/', '') + branches.append(branch_name) + + logger.info(f"获取到分支列表: {branches}") + return branches + else: + logger.warning(f"获取分支列表失败: {process.stderr}") + return [] + + except subprocess.TimeoutExpired: + logger.warning("获取分支列表超时") + return [] + except Exception as e: + logger.error(f"获取分支列表时发生错误: {e}") + return [] + async def _clone_with_url( self, url: str, target_path: Path, branch: Optional[str], depth: Optional[int], mirror_type: str ) -> Dict[str, Any]: - """使用指定 URL 克隆仓库,支持重试""" + """使用指定 URL 克隆仓库,支持重试和智能分支选择""" attempts = 0 last_error = None + + # 初始分支选择 + selected_branch = branch + branch_selection_tried = False # 标记是否已经尝试过智能分支选择 + + # 如果没有指定分支,或者指定了分支但后续发现不存在,则进行智能分支选择 + if not selected_branch or branch: + logger.info(f"初始分支选择: {selected_branch or '未指定分支'},开始检测可用分支...") + available_branches = await self._get_available_branches(url) + + if available_branches: + logger.info(f"检测到的分支: {available_branches}") + + # 如果指定了分支,先检查该分支是否存在 + if branch and branch in available_branches: + selected_branch = branch + logger.info(f"指定的分支 '{branch}' 存在,使用该分支") + else: + # 如果指定了分支但不存在,记录日志 + if branch and branch not in available_branches: + logger.warning(f"指定的分支 '{branch}' 不存在,将进行智能分支选择") + + # 优先选择 main 分支 + if "main" in available_branches: + selected_branch = "main" + logger.info("选择 main 分支") + # 其次选择 master 分支 + elif "master" in available_branches: + selected_branch = "master" + logger.info("选择 master 分支") + # 如果都没有,使用默认分支(不指定分支参数) + else: + selected_branch = None + logger.info("未找到 main 或 master 分支,使用默认分支") + + branch_selection_tried = True + else: + logger.warning("无法获取分支列表,将使用默认分支或指定分支") + selected_branch = branch # 保持原分支设置 for attempt in range(self.max_retries): attempts += 1 @@ -575,8 +657,8 @@ class GitMirrorService: cmd = ["git", "clone"] # 添加分支参数 - if branch: - cmd.extend(["-b", branch]) + if selected_branch: + cmd.extend(["-b", selected_branch]) # 添加深度参数(浅克隆) if depth: @@ -590,10 +672,11 @@ class GitMirrorService: # 推送进度 if _update_progress: try: + branch_info = f"分支: {selected_branch}" if selected_branch else "默认分支" await _update_progress( stage="loading", progress=20 + attempt * 10, - message=f"正在克隆仓库 (尝试 {attempt + 1}/{self.max_retries})...", + message=f"正在克隆仓库 {branch_info} (尝试 {attempt + 1}/{self.max_retries})...", operation="install", ) except Exception as e: @@ -620,11 +703,44 @@ class GitMirrorService: "mirror_used": mirror_type, "attempts": attempts, "url": url, - "branch": branch or "default", + "branch": selected_branch or "default", } else: - last_error = f"Git 克隆失败: {process.stderr}" + error_output = process.stderr + last_error = f"Git 克隆失败: {error_output}" logger.warning(f"克隆失败 (尝试 {attempt + 1}/{self.max_retries}): {last_error}") + + # 如果是因为指定分支不存在导致的失败,尝试智能分支选择 + if (selected_branch and + ("Remote branch" in error_output and "not found" in error_output) and + not branch_selection_tried): + + logger.info(f"分支 '{selected_branch}' 不存在,尝试智能分支选择...") + + # 重新检测分支 + available_branches = await self._get_available_branches(url) + if available_branches: + logger.info(f"可用分支: {available_branches}") + + # 优先选择 main 分支 + if "main" in available_branches: + selected_branch = "main" + logger.info("切换到 main 分支") + # 其次选择 master 分支 + elif "master" in available_branches: + selected_branch = "master" + logger.info("切换到 master 分支") + # 如果都没有,使用默认分支 + else: + selected_branch = None + logger.info("切换到默认分支") + + branch_selection_tried = True + # 重置尝试计数,给新的分支选择一个完整的机会 + attempts = 0 + continue # 重新开始循环 + else: + logger.warning("无法获取分支列表,继续尝试下一个镜像源") except subprocess.TimeoutExpired: last_error = "克隆超时(超过 5 分钟)"