fix(file-summary): 同步压缩包工作流状态与结果刷新

This commit is contained in:
2026-06-06 19:45:49 +08:00
parent daa0642142
commit 7e561ea213
12 changed files with 560 additions and 32 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import logging
import subprocess
from pathlib import Path
from zipfile import ZipFile
@@ -9,6 +10,8 @@ import py7zr
ARCHIVE_EXTENSIONS = {"zip", "7z", "rar"}
logger = logging.getLogger("review_agent.file_summary.services.archive")
def _ensure_inside_target(path: Path, target_dir: Path) -> None:
target = target_dir.resolve()
@@ -63,6 +66,51 @@ def _extract_7z(archive_path: Path, target_dir: Path) -> list[Path]:
def _extract_rar(archive_path: Path, target_dir: Path) -> list[Path]:
try:
extracted = _extract_rar_with_libarchive(archive_path, target_dir)
except Exception as exc:
logger.warning(
"RAR libarchive extract failed, falling back to 7z",
extra={"archive_path": str(archive_path), "target_dir": str(target_dir), "error": str(exc)},
)
else:
if extracted:
return extracted
logger.info(
"RAR libarchive extract produced no files, falling back to 7z",
extra={"archive_path": str(archive_path), "target_dir": str(target_dir)},
)
return _extract_rar_with_7z(archive_path, target_dir)
def _extract_rar_with_libarchive(archive_path: Path, target_dir: Path) -> list[Path]:
try:
import libarchive
except ImportError as exc:
raise RuntimeError("未安装 libarchive跳过 Python RAR 解压。") from exc
extracted: list[Path] = []
with libarchive.file_reader(str(archive_path)) as entries:
for entry in entries:
destination = _safe_member_path(target_dir, entry.pathname)
if entry.isdir:
destination.mkdir(parents=True, exist_ok=True)
continue
if not entry.isfile:
logger.info(
"RAR libarchive skipped non-regular entry",
extra={"archive_path": str(archive_path), "entry": entry.pathname},
)
continue
destination.parent.mkdir(parents=True, exist_ok=True)
with destination.open("wb") as target:
for block in entry.get_blocks():
target.write(block)
extracted.append(destination)
return extracted
def _extract_rar_with_7z(archive_path: Path, target_dir: Path) -> list[Path]:
result = subprocess.run(
["7z", "x", f"-o{target_dir}", str(archive_path), "-y"],
check=False,

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import logging
from pathlib import Path
import re
from review_agent.models import FileSummaryBatchAttachment
@@ -13,34 +14,56 @@ from .base import BaseSkill, SkillResult, WorkflowContext
logger = logging.getLogger("review_agent.file_summary.skills.archive_extract")
def _safe_archive_dir_name(binding: FileSummaryBatchAttachment) -> str:
stem = Path(binding.attachment.original_name).stem or "archive"
safe_stem = re.sub(r"[^A-Za-z0-9._-]+", "_", stem).strip("._") or "archive"
return f"{binding.attachment_id}_{safe_stem}"
class ArchiveExtractSkill(BaseSkill):
name = "archive_extract"
def run(self, context: WorkflowContext) -> SkillResult:
extracted_count = 0
target_dir = Path(context.batch.work_dir or "")
if not target_dir:
logger.info(
"Archive extract skipped without work dir",
if not context.batch.work_dir:
message = "批次工作目录为空,无法解压压缩包。"
logger.error(
"Archive extract failed without work dir",
extra={"batch_id": context.batch.pk, "batch_no": context.batch.batch_no},
)
return SkillResult(success=True, data={"extracted_count": 0})
return SkillResult(success=False, message=message, data={"extracted_count": 0})
target_root = Path(context.batch.work_dir)
archive_count = 0
for binding in FileSummaryBatchAttachment.objects.filter(batch=context.batch):
path = resolve_storage_path(binding.attachment.storage_path)
if path.suffix.lower().lstrip(".") not in ARCHIVE_EXTENSIONS:
continue
archive_count += 1
target_dir = target_root / "extracted" / _safe_archive_dir_name(binding)
logger.info(
"Archive extract started",
extra={
"batch_id": context.batch.pk,
"attachment_id": binding.attachment_id,
"path": str(path),
"target_dir": str(target_dir),
},
)
extracted_count += len(extract_archive(path, target_dir))
if archive_count and extracted_count == 0:
message = "压缩包未解出任何可扫描文件,请检查压缩包内容或格式。"
logger.warning(
"Archive extract produced no files",
extra={"batch_id": context.batch.pk, "archive_count": archive_count},
)
return SkillResult(success=False, message=message, data={"extracted_count": 0})
logger.info(
"Archive extract finished",
extra={"batch_id": context.batch.pk, "extracted_count": extracted_count},
extra={
"batch_id": context.batch.pk,
"archive_count": archive_count,
"extracted_count": extracted_count,
},
)
return SkillResult(success=True, data={"extracted_count": extracted_count})

View File

@@ -2,10 +2,12 @@ from __future__ import annotations
import logging
from pathlib import Path
import re
from review_agent.models import FileSummaryBatchAttachment
from ..paths import resolve_storage_path
from ..services.archive import ARCHIVE_EXTENSIONS
from ..services.inventory import scan_files_to_items
from .base import BaseSkill, SkillResult, WorkflowContext
@@ -13,14 +15,44 @@ from .base import BaseSkill, SkillResult, WorkflowContext
logger = logging.getLogger("review_agent.file_summary.skills.file_inventory")
def _safe_archive_dir_name(binding: FileSummaryBatchAttachment) -> str:
stem = Path(binding.attachment.original_name).stem or "archive"
safe_stem = re.sub(r"[^A-Za-z0-9._-]+", "_", stem).strip("._") or "archive"
return f"{binding.attachment_id}_{safe_stem}"
class FileInventorySkill(BaseSkill):
name = "file_inventory"
def run(self, context: WorkflowContext) -> SkillResult:
roots = [
resolve_storage_path(binding.attachment.storage_path)
for binding in FileSummaryBatchAttachment.objects.filter(batch=context.batch)
]
roots: list[Path] = []
missing_extract_roots: list[str] = []
for binding in FileSummaryBatchAttachment.objects.filter(batch=context.batch):
original_path = resolve_storage_path(binding.attachment.storage_path)
is_archive = original_path.suffix.lower().lstrip(".") in ARCHIVE_EXTENSIONS
if not is_archive:
roots.append(original_path)
continue
extracted_root = (
Path(context.batch.work_dir)
/ "extracted"
/ _safe_archive_dir_name(binding)
)
if extracted_root.exists():
roots.append(extracted_root)
else:
missing_extract_roots.append(str(extracted_root))
if missing_extract_roots:
message = "压缩包解压目录不存在,无法扫描解压后的文件。"
logger.warning(
"File inventory missing extracted roots",
extra={
"batch_id": context.batch.pk,
"missing_extract_roots": missing_extract_roots,
},
)
return SkillResult(success=False, message=message)
logger.info(
"File inventory started",
extra={

View File

@@ -5,7 +5,7 @@ from pathlib import Path
from django.http import FileResponse, Http404, JsonResponse
from django.views.decorators.http import require_http_methods
from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment
from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment, Message
from review_agent.models import FileSummaryBatch, WorkflowEvent
from .events import serialize_event
@@ -90,6 +90,47 @@ def attachment_detail(request, conversation_id: int, attachment_id: int):
return JsonResponse({"ok": True, "attachment": serialize_attachment(attachment)})
def _serialize_message(message: Message) -> dict[str, object]:
return {
"id": message.pk,
"role": message.role,
"content": message.content,
"created_at": message.created_at.isoformat(),
}
@require_http_methods(["GET"])
@login_required
def conversation_messages(request, conversation_id: int):
conversation = _conversation_for_user(request.user, conversation_id)
after = request.GET.get("after") or "0"
try:
after_id = int(after)
except ValueError:
after_id = 0
messages = list(conversation.messages.filter(pk__gt=after_id).order_by("id"))
latest_message_id = (
conversation.messages.order_by("-id").values_list("id", flat=True).first() or 0
)
logger.info(
"Conversation incremental messages requested",
extra={
"conversation_id": conversation.pk,
"after_id": after_id,
"message_count": len(messages),
"latest_message_id": latest_message_id,
},
)
return JsonResponse(
{
"conversation_id": conversation.pk,
"latest_message_id": latest_message_id,
"messages": [_serialize_message(message) for message in messages],
}
)
@require_http_methods(["GET"])
@login_required
def batch_status(request, batch_id: int):
@@ -107,6 +148,7 @@ def batch_status(request, batch_id: int):
"success_files": batch.success_files,
"failed_files": batch.failed_files,
"total_pages": batch.total_pages,
"error_message": batch.error_message,
},
"nodes": [
{

View File

@@ -1,9 +1,11 @@
from __future__ import annotations
import logging
from pathlib import Path
from threading import Thread
from uuid import uuid4
from django.conf import settings
from django.db import transaction
from django.utils import timezone
@@ -17,6 +19,7 @@ from review_agent.models import (
)
from .events import record_event
from .services.archive import ARCHIVE_EXTENSIONS
from .skills.archive_extract import ArchiveExtractSkill
from .skills.base import WorkflowContext
from .skills.document_page_count import DocumentPageCountSkill
@@ -54,6 +57,10 @@ def build_batch_no() -> str:
return f"FS-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}"
def build_batch_work_dir(batch_no: str) -> Path:
return Path(settings.MEDIA_ROOT) / "file_summary" / "work" / batch_no
@transaction.atomic
def create_file_summary_batch(
*,
@@ -78,15 +85,29 @@ def create_file_summary_batch(
},
)
batch_no = build_batch_no()
work_dir = build_batch_work_dir(batch_no)
work_dir.mkdir(parents=True, exist_ok=True)
batch = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
trigger_message=trigger_message,
batch_no=build_batch_no(),
batch_no=batch_no,
work_dir=str(work_dir),
)
for attachment in active_attachments:
FileSummaryBatchAttachment.objects.create(batch=batch, attachment=attachment)
source_role = (
FileSummaryBatchAttachment.SourceRole.ARCHIVE
if Path(attachment.original_name).suffix.lower().lstrip(".") in ARCHIVE_EXTENSIONS
else FileSummaryBatchAttachment.SourceRole.MULTI_FILE
)
FileSummaryBatchAttachment.objects.create(
batch=batch,
attachment=attachment,
source_role=source_role,
)
attachment.upload_status = FileAttachment.UploadStatus.BOUND
attachment.save(update_fields=["upload_status"])
@@ -152,7 +173,7 @@ class WorkflowExecutor:
record_event(
self.batch,
"node_progress",
{"node_code": node.node_code, "status": node.status, "progress": node.progress},
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
)
skill_name = next(
@@ -160,18 +181,35 @@ class WorkflowExecutor:
"",
)
if skill_name:
result = self.registry.execute(skill_name, WorkflowContext(batch=self.batch))
if not result.success:
logger.warning(
"Workflow node skill failed",
extra={
"batch_id": self.batch.pk,
try:
result = self.registry.execute(skill_name, WorkflowContext(batch=self.batch))
if not result.success:
logger.warning(
"Workflow node skill failed",
extra={
"batch_id": self.batch.pk,
"node_code": node.node_code,
"skill_name": skill_name,
"result_message": result.message,
},
)
raise RuntimeError(result.message or f"{node.node_name}执行失败")
except Exception as exc:
node.status = WorkflowNodeRun.Status.FAILED
node.finished_at = timezone.now()
node.message = str(exc)
node.save(update_fields=["status", "finished_at", "message"])
record_event(
self.batch,
"node_progress",
{
"node_code": node.node_code,
"skill_name": skill_name,
"result_message": result.message,
"status": node.status,
"progress": node.progress,
"message": node.message,
},
)
raise RuntimeError(result.message or f"{node.node_name}执行失败")
raise
node.status = WorkflowNodeRun.Status.SUCCESS
node.progress = 100
@@ -181,7 +219,7 @@ class WorkflowExecutor:
record_event(
self.batch,
"node_progress",
{"node_code": node.node_code, "status": node.status, "progress": node.progress},
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
)
logger.info(
"Workflow node finished",

View File

@@ -1,6 +1,13 @@
from django.urls import path
from .file_summary.views import attachment_detail, attachments, batch_events, batch_status, export_download
from .file_summary.views import (
attachment_detail,
attachments,
batch_events,
batch_status,
conversation_messages,
export_download,
)
urlpatterns = [
@@ -19,6 +26,11 @@ urlpatterns = [
attachment_detail,
name="file_summary_attachment_detail",
),
path(
"api/review-agent/conversations/<int:conversation_id>/messages/",
conversation_messages,
name="review_agent_conversation_messages",
),
path(
"api/review-agent/file-summary/<int:batch_id>/status/",
batch_status,