Files

126 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
import subprocess
from pathlib import Path
from zipfile import ZipFile
import py7zr
ARCHIVE_EXTENSIONS = {"zip", "7z", "rar"}
logger = logging.getLogger("review_agent.file_summary.services.archive")
def _ensure_inside_target(path: Path, target_dir: Path) -> None:
target = target_dir.resolve()
resolved = path.resolve()
if target != resolved and target not in resolved.parents:
raise ValueError("解压路径必须位于批次工作目录内。")
def _safe_member_path(target_dir: Path, member_name: str) -> Path:
destination = target_dir / member_name
_ensure_inside_target(destination, target_dir)
return destination
def extract_archive(archive_path: str | Path, target_dir: str | Path) -> list[Path]:
archive_path = Path(archive_path)
target_dir = Path(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
ext = archive_path.suffix.lower().lstrip(".")
if ext not in ARCHIVE_EXTENSIONS:
return []
if ext == "zip":
return _extract_zip(archive_path, target_dir)
if ext == "7z":
return _extract_7z(archive_path, target_dir)
return _extract_rar(archive_path, target_dir)
def _extract_zip(archive_path: Path, target_dir: Path) -> list[Path]:
extracted: list[Path] = []
with ZipFile(archive_path) as archive:
for member in archive.infolist():
destination = _safe_member_path(target_dir, member.filename)
if member.is_dir():
destination.mkdir(parents=True, exist_ok=True)
continue
destination.parent.mkdir(parents=True, exist_ok=True)
with archive.open(member) as source, destination.open("wb") as target:
target.write(source.read())
extracted.append(destination)
return extracted
def _extract_7z(archive_path: Path, target_dir: Path) -> list[Path]:
with py7zr.SevenZipFile(archive_path, mode="r") as archive:
names = archive.getnames()
for name in names:
_safe_member_path(target_dir, name)
archive.extractall(path=target_dir)
return [target_dir / name for name in names if (target_dir / name).is_file()]
def _extract_rar(archive_path: Path, target_dir: Path) -> list[Path]:
try:
extracted = _extract_rar_with_libarchive(archive_path, target_dir)
except Exception as exc:
logger.warning(
"RAR libarchive extract failed, falling back to 7z",
extra={"archive_path": str(archive_path), "target_dir": str(target_dir), "error": str(exc)},
)
else:
if extracted:
return extracted
logger.info(
"RAR libarchive extract produced no files, falling back to 7z",
extra={"archive_path": str(archive_path), "target_dir": str(target_dir)},
)
return _extract_rar_with_7z(archive_path, target_dir)
def _extract_rar_with_libarchive(archive_path: Path, target_dir: Path) -> list[Path]:
try:
import libarchive
except ImportError as exc:
raise RuntimeError("未安装 libarchive跳过 Python RAR 解压。") from exc
extracted: list[Path] = []
with libarchive.file_reader(str(archive_path)) as entries:
for entry in entries:
destination = _safe_member_path(target_dir, entry.pathname)
if entry.isdir:
destination.mkdir(parents=True, exist_ok=True)
continue
if not entry.isfile:
logger.info(
"RAR libarchive skipped non-regular entry",
extra={"archive_path": str(archive_path), "entry": entry.pathname},
)
continue
destination.parent.mkdir(parents=True, exist_ok=True)
with destination.open("wb") as target:
for block in entry.get_blocks():
target.write(block)
extracted.append(destination)
return extracted
def _extract_rar_with_7z(archive_path: Path, target_dir: Path) -> list[Path]:
result = subprocess.run(
["7z", "x", f"-o{target_dir}", str(archive_path), "-y"],
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(result.stderr or result.stdout or "rar 解压失败")
extracted = [path for path in target_dir.rglob("*") if path.is_file()]
for path in extracted:
_ensure_inside_target(path, target_dir)
return extracted