70 lines
2.5 KiB
Python
70 lines
2.5 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
import re
|
|
|
|
from review_agent.models import FileSummaryBatchAttachment
|
|
|
|
from ..paths import resolve_storage_path
|
|
from ..services.archive import ARCHIVE_EXTENSIONS
|
|
from ..services.inventory import scan_files_to_items
|
|
from .base import BaseSkill, SkillResult, WorkflowContext
|
|
|
|
|
|
logger = logging.getLogger("review_agent.file_summary.skills.file_inventory")
|
|
|
|
|
|
def _safe_archive_dir_name(binding: FileSummaryBatchAttachment) -> str:
|
|
stem = Path(binding.attachment.original_name).stem or "archive"
|
|
safe_stem = re.sub(r"[^A-Za-z0-9._-]+", "_", stem).strip("._") or "archive"
|
|
return f"{binding.attachment_id}_{safe_stem}"
|
|
|
|
|
|
class FileInventorySkill(BaseSkill):
|
|
name = "file_inventory"
|
|
|
|
def run(self, context: WorkflowContext) -> SkillResult:
|
|
roots: list[Path] = []
|
|
missing_extract_roots: list[str] = []
|
|
for binding in FileSummaryBatchAttachment.objects.filter(batch=context.batch):
|
|
original_path = resolve_storage_path(binding.attachment.storage_path)
|
|
is_archive = original_path.suffix.lower().lstrip(".") in ARCHIVE_EXTENSIONS
|
|
if not is_archive:
|
|
roots.append(original_path)
|
|
continue
|
|
|
|
extracted_root = (
|
|
Path(context.batch.work_dir)
|
|
/ "extracted"
|
|
/ _safe_archive_dir_name(binding)
|
|
)
|
|
if extracted_root.exists():
|
|
roots.append(extracted_root)
|
|
else:
|
|
missing_extract_roots.append(str(extracted_root))
|
|
if missing_extract_roots:
|
|
message = "压缩包解压目录不存在,无法扫描解压后的文件。"
|
|
logger.warning(
|
|
"File inventory missing extracted roots",
|
|
extra={
|
|
"batch_id": context.batch.pk,
|
|
"missing_extract_roots": missing_extract_roots,
|
|
},
|
|
)
|
|
return SkillResult(success=False, message=message)
|
|
logger.info(
|
|
"File inventory started",
|
|
extra={
|
|
"batch_id": context.batch.pk,
|
|
"root_count": len(roots),
|
|
"roots": [str(root) for root in roots],
|
|
},
|
|
)
|
|
items = scan_files_to_items(batch=context.batch, roots=roots)
|
|
logger.info(
|
|
"File inventory finished",
|
|
extra={"batch_id": context.batch.pk, "total_files": len(items)},
|
|
)
|
|
return SkillResult(success=True, data={"total_files": len(items)})
|