Files
DEMO-AGENT/review_agent/file_summary/services/archive.py

78 lines
2.6 KiB
Python

from __future__ import annotations
import subprocess
from pathlib import Path
from zipfile import ZipFile
import py7zr
ARCHIVE_EXTENSIONS = {"zip", "7z", "rar"}
def _ensure_inside_target(path: Path, target_dir: Path) -> None:
target = target_dir.resolve()
resolved = path.resolve()
if target != resolved and target not in resolved.parents:
raise ValueError("解压路径必须位于批次工作目录内。")
def _safe_member_path(target_dir: Path, member_name: str) -> Path:
destination = target_dir / member_name
_ensure_inside_target(destination, target_dir)
return destination
def extract_archive(archive_path: str | Path, target_dir: str | Path) -> list[Path]:
archive_path = Path(archive_path)
target_dir = Path(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
ext = archive_path.suffix.lower().lstrip(".")
if ext not in ARCHIVE_EXTENSIONS:
return []
if ext == "zip":
return _extract_zip(archive_path, target_dir)
if ext == "7z":
return _extract_7z(archive_path, target_dir)
return _extract_rar(archive_path, target_dir)
def _extract_zip(archive_path: Path, target_dir: Path) -> list[Path]:
extracted: list[Path] = []
with ZipFile(archive_path) as archive:
for member in archive.infolist():
destination = _safe_member_path(target_dir, member.filename)
if member.is_dir():
destination.mkdir(parents=True, exist_ok=True)
continue
destination.parent.mkdir(parents=True, exist_ok=True)
with archive.open(member) as source, destination.open("wb") as target:
target.write(source.read())
extracted.append(destination)
return extracted
def _extract_7z(archive_path: Path, target_dir: Path) -> list[Path]:
with py7zr.SevenZipFile(archive_path, mode="r") as archive:
names = archive.getnames()
for name in names:
_safe_member_path(target_dir, name)
archive.extractall(path=target_dir)
return [target_dir / name for name in names if (target_dir / name).is_file()]
def _extract_rar(archive_path: Path, target_dir: Path) -> list[Path]:
result = subprocess.run(
["7z", "x", f"-o{target_dir}", str(archive_path), "-y"],
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(result.stderr or result.stdout or "rar 解压失败")
extracted = [path for path in target_dir.rglob("*") if path.is_file()]
for path in extracted:
_ensure_inside_target(path, target_dir)
return extracted