258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from threading import Thread
|
|
from uuid import uuid4
|
|
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from review_agent.models import (
|
|
Conversation,
|
|
FileAttachment,
|
|
FileSummaryBatch,
|
|
FileSummaryBatchAttachment,
|
|
Message,
|
|
WorkflowNodeRun,
|
|
)
|
|
from review_agent.notifications.dispatcher import dispatch_workflow_notification
|
|
from review_agent.notifications.workflow_adapters import build_file_summary_context
|
|
|
|
from .events import record_event
|
|
from .services.archive import ARCHIVE_EXTENSIONS
|
|
from .skills.archive_extract import ArchiveExtractSkill
|
|
from .skills.base import WorkflowContext
|
|
from .skills.document_page_count import DocumentPageCountSkill
|
|
from .skills.file_inventory import FileInventorySkill
|
|
from .skills.product_detect import ProductDetectSkill
|
|
from .skills.registry import SkillRegistry
|
|
from .skills.summary_report import SummaryReportSkill
|
|
|
|
|
|
NODE_DEFINITIONS = [
|
|
("upload", "附件固化", ""),
|
|
("extract", "压缩包解压", "archive_extract"),
|
|
("inventory", "文件扫描", "file_inventory"),
|
|
("page_count", "页数统计", "document_page_count"),
|
|
("product_detect", "产品识别", "product_detect"),
|
|
("report", "报告输出", "summary_report"),
|
|
("complete", "完成", ""),
|
|
]
|
|
|
|
|
|
logger = logging.getLogger("review_agent.file_summary.workflow")
|
|
|
|
|
|
def default_skill_registry() -> SkillRegistry:
|
|
registry = SkillRegistry()
|
|
registry.register(ArchiveExtractSkill())
|
|
registry.register(FileInventorySkill())
|
|
registry.register(DocumentPageCountSkill())
|
|
registry.register(ProductDetectSkill())
|
|
registry.register(SummaryReportSkill())
|
|
return registry
|
|
|
|
|
|
def build_batch_no() -> str:
|
|
return f"FS-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}"
|
|
|
|
|
|
def build_batch_work_dir(batch_no: str) -> Path:
|
|
return Path(settings.MEDIA_ROOT) / "file_summary" / "work" / batch_no
|
|
|
|
|
|
@transaction.atomic
|
|
def create_file_summary_batch(
|
|
*,
|
|
conversation: Conversation,
|
|
user,
|
|
trigger_message: Message | None = None,
|
|
) -> FileSummaryBatch:
|
|
active_attachments = list(
|
|
FileAttachment.objects.select_for_update()
|
|
.filter(conversation=conversation, is_active=True)
|
|
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
|
|
.order_by("original_name", "-created_at")
|
|
)
|
|
if not active_attachments:
|
|
raise ValueError("当前对话没有可用附件。")
|
|
logger.info(
|
|
"File summary batch creation started",
|
|
extra={
|
|
"conversation_id": conversation.pk,
|
|
"user_id": user.pk,
|
|
"attachment_ids": [attachment.pk for attachment in active_attachments],
|
|
},
|
|
)
|
|
|
|
batch_no = build_batch_no()
|
|
work_dir = build_batch_work_dir(batch_no)
|
|
work_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
batch = FileSummaryBatch.objects.create(
|
|
conversation=conversation,
|
|
user=user,
|
|
trigger_message=trigger_message,
|
|
batch_no=batch_no,
|
|
work_dir=str(work_dir),
|
|
)
|
|
|
|
for attachment in active_attachments:
|
|
source_role = (
|
|
FileSummaryBatchAttachment.SourceRole.ARCHIVE
|
|
if Path(attachment.original_name).suffix.lower().lstrip(".") in ARCHIVE_EXTENSIONS
|
|
else FileSummaryBatchAttachment.SourceRole.MULTI_FILE
|
|
)
|
|
FileSummaryBatchAttachment.objects.create(
|
|
batch=batch,
|
|
attachment=attachment,
|
|
source_role=source_role,
|
|
)
|
|
attachment.upload_status = FileAttachment.UploadStatus.BOUND
|
|
attachment.save(update_fields=["upload_status"])
|
|
|
|
for code, name, _skill_name in NODE_DEFINITIONS:
|
|
WorkflowNodeRun.objects.create(
|
|
batch=batch,
|
|
workflow_type="file_summary",
|
|
workflow_batch_id=batch.pk,
|
|
node_group="file_summary",
|
|
node_code=code,
|
|
node_name=name,
|
|
)
|
|
|
|
record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no})
|
|
logger.info(
|
|
"File summary batch created",
|
|
extra={"batch_id": batch.pk, "batch_no": batch.batch_no},
|
|
)
|
|
return batch
|
|
|
|
|
|
class WorkflowExecutor:
|
|
def __init__(self, batch: FileSummaryBatch, registry: SkillRegistry | None = None):
|
|
self.batch = batch
|
|
self.registry = registry or default_skill_registry()
|
|
|
|
def run(self) -> None:
|
|
logger.info("Workflow run started", extra={"batch_id": self.batch.pk})
|
|
self.batch.status = FileSummaryBatch.Status.RUNNING
|
|
self.batch.started_at = timezone.now()
|
|
self.batch.save(update_fields=["status", "started_at"])
|
|
record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk})
|
|
|
|
try:
|
|
for node in self.batch.node_runs.order_by("id"):
|
|
self._run_node(node)
|
|
except Exception as exc:
|
|
logger.exception(
|
|
"Workflow run failed",
|
|
extra={"batch_id": self.batch.pk, "error": str(exc)},
|
|
)
|
|
self.batch.status = FileSummaryBatch.Status.FAILED
|
|
self.batch.error_message = str(exc)
|
|
self.batch.finished_at = timezone.now()
|
|
self.batch.save(update_fields=["status", "error_message", "finished_at"])
|
|
record_event(self.batch, "workflow_failed", {"message": str(exc)})
|
|
self._dispatch_completion_notification()
|
|
return
|
|
|
|
self.batch.status = FileSummaryBatch.Status.SUCCESS
|
|
self.batch.finished_at = timezone.now()
|
|
self.batch.save(update_fields=["status", "finished_at"])
|
|
record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk})
|
|
self._dispatch_completion_notification()
|
|
logger.info("Workflow run completed", extra={"batch_id": self.batch.pk})
|
|
|
|
def _dispatch_completion_notification(self) -> None:
|
|
try:
|
|
dispatch_workflow_notification(build_file_summary_context(self.batch))
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"File summary notification failed without blocking workflow",
|
|
extra={"batch_id": self.batch.pk, "error": str(exc)},
|
|
)
|
|
|
|
def _run_node(self, node: WorkflowNodeRun) -> None:
|
|
logger.info(
|
|
"Workflow node started",
|
|
extra={
|
|
"batch_id": self.batch.pk,
|
|
"node_code": node.node_code,
|
|
"node_name": node.node_name,
|
|
},
|
|
)
|
|
now = timezone.now()
|
|
node.status = WorkflowNodeRun.Status.RUNNING
|
|
node.progress = 10
|
|
node.started_at = now
|
|
node.message = f"{node.node_name}处理中"
|
|
node.save(update_fields=["status", "progress", "started_at", "message"])
|
|
record_event(
|
|
self.batch,
|
|
"node_progress",
|
|
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
|
|
)
|
|
|
|
skill_name = next(
|
|
(skill for code, _name, skill in NODE_DEFINITIONS if code == node.node_code),
|
|
"",
|
|
)
|
|
if skill_name:
|
|
try:
|
|
result = self.registry.execute(skill_name, WorkflowContext(batch=self.batch))
|
|
if not result.success:
|
|
logger.warning(
|
|
"Workflow node skill failed",
|
|
extra={
|
|
"batch_id": self.batch.pk,
|
|
"node_code": node.node_code,
|
|
"skill_name": skill_name,
|
|
"result_message": result.message,
|
|
},
|
|
)
|
|
raise RuntimeError(result.message or f"{node.node_name}执行失败")
|
|
except Exception as exc:
|
|
node.status = WorkflowNodeRun.Status.FAILED
|
|
node.finished_at = timezone.now()
|
|
node.message = str(exc)
|
|
node.save(update_fields=["status", "finished_at", "message"])
|
|
record_event(
|
|
self.batch,
|
|
"node_progress",
|
|
{
|
|
"node_code": node.node_code,
|
|
"status": node.status,
|
|
"progress": node.progress,
|
|
"message": node.message,
|
|
},
|
|
)
|
|
raise
|
|
|
|
node.status = WorkflowNodeRun.Status.SUCCESS
|
|
node.progress = 100
|
|
node.finished_at = timezone.now()
|
|
node.message = f"{node.node_name}完成"
|
|
node.save(update_fields=["status", "progress", "finished_at", "message"])
|
|
record_event(
|
|
self.batch,
|
|
"node_progress",
|
|
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
|
|
)
|
|
logger.info(
|
|
"Workflow node finished",
|
|
extra={"batch_id": self.batch.pk, "node_code": node.node_code},
|
|
)
|
|
|
|
|
|
def start_file_summary_workflow(batch: FileSummaryBatch, *, async_run: bool = True) -> None:
|
|
executor = WorkflowExecutor(batch)
|
|
if not async_run:
|
|
logger.info("Workflow starting synchronously", extra={"batch_id": batch.pk})
|
|
executor.run()
|
|
return
|
|
logger.info("Workflow starting asynchronously", extra={"batch_id": batch.pk})
|
|
Thread(target=executor.run, daemon=True).start()
|