from __future__ import annotations import logging import subprocess from pathlib import Path from zipfile import ZipFile import py7zr ARCHIVE_EXTENSIONS = {"zip", "7z", "rar"} logger = logging.getLogger("review_agent.file_summary.services.archive") def _ensure_inside_target(path: Path, target_dir: Path) -> None: target = target_dir.resolve() resolved = path.resolve() if target != resolved and target not in resolved.parents: raise ValueError("解压路径必须位于批次工作目录内。") def _safe_member_path(target_dir: Path, member_name: str) -> Path: destination = target_dir / member_name _ensure_inside_target(destination, target_dir) return destination def extract_archive(archive_path: str | Path, target_dir: str | Path) -> list[Path]: archive_path = Path(archive_path) target_dir = Path(target_dir) target_dir.mkdir(parents=True, exist_ok=True) ext = archive_path.suffix.lower().lstrip(".") if ext not in ARCHIVE_EXTENSIONS: return [] if ext == "zip": return _extract_zip(archive_path, target_dir) if ext == "7z": return _extract_7z(archive_path, target_dir) return _extract_rar(archive_path, target_dir) def _extract_zip(archive_path: Path, target_dir: Path) -> list[Path]: extracted: list[Path] = [] with ZipFile(archive_path) as archive: for member in archive.infolist(): destination = _safe_member_path(target_dir, member.filename) if member.is_dir(): destination.mkdir(parents=True, exist_ok=True) continue destination.parent.mkdir(parents=True, exist_ok=True) with archive.open(member) as source, destination.open("wb") as target: target.write(source.read()) extracted.append(destination) return extracted def _extract_7z(archive_path: Path, target_dir: Path) -> list[Path]: with py7zr.SevenZipFile(archive_path, mode="r") as archive: names = archive.getnames() for name in names: _safe_member_path(target_dir, name) archive.extractall(path=target_dir) return [target_dir / name for name in names if (target_dir / name).is_file()] def _extract_rar(archive_path: Path, target_dir: Path) -> list[Path]: try: extracted = _extract_rar_with_libarchive(archive_path, target_dir) except Exception as exc: logger.warning( "RAR libarchive extract failed, falling back to 7z", extra={"archive_path": str(archive_path), "target_dir": str(target_dir), "error": str(exc)}, ) else: if extracted: return extracted logger.info( "RAR libarchive extract produced no files, falling back to 7z", extra={"archive_path": str(archive_path), "target_dir": str(target_dir)}, ) return _extract_rar_with_7z(archive_path, target_dir) def _extract_rar_with_libarchive(archive_path: Path, target_dir: Path) -> list[Path]: try: import libarchive except ImportError as exc: raise RuntimeError("未安装 libarchive,跳过 Python RAR 解压。") from exc extracted: list[Path] = [] with libarchive.file_reader(str(archive_path)) as entries: for entry in entries: destination = _safe_member_path(target_dir, entry.pathname) if entry.isdir: destination.mkdir(parents=True, exist_ok=True) continue if not entry.isfile: logger.info( "RAR libarchive skipped non-regular entry", extra={"archive_path": str(archive_path), "entry": entry.pathname}, ) continue destination.parent.mkdir(parents=True, exist_ok=True) with destination.open("wb") as target: for block in entry.get_blocks(): target.write(block) extracted.append(destination) return extracted def _extract_rar_with_7z(archive_path: Path, target_dir: Path) -> list[Path]: result = subprocess.run( ["7z", "x", f"-o{target_dir}", str(archive_path), "-y"], check=False, capture_output=True, text=True, ) if result.returncode != 0: raise RuntimeError(result.stderr or result.stdout or "rar 解压失败") extracted = [path for path in target_dir.rglob("*") if path.is_file()] for path in extracted: _ensure_inside_target(path, target_dir) return extracted