from __future__ import annotations import logging from threading import Thread from uuid import uuid4 from django.db import transaction from django.utils import timezone from review_agent.models import ( Conversation, FileAttachment, FileSummaryBatch, FileSummaryBatchAttachment, Message, WorkflowNodeRun, ) from .events import record_event from .skills.archive_extract import ArchiveExtractSkill from .skills.base import WorkflowContext from .skills.document_page_count import DocumentPageCountSkill from .skills.file_inventory import FileInventorySkill from .skills.product_detect import ProductDetectSkill from .skills.registry import SkillRegistry from .skills.summary_report import SummaryReportSkill NODE_DEFINITIONS = [ ("upload", "附件固化", ""), ("extract", "压缩包解压", "archive_extract"), ("inventory", "文件扫描", "file_inventory"), ("page_count", "页数统计", "document_page_count"), ("product_detect", "产品识别", "product_detect"), ("report", "报告输出", "summary_report"), ("complete", "完成", ""), ] logger = logging.getLogger("review_agent.file_summary.workflow") def default_skill_registry() -> SkillRegistry: registry = SkillRegistry() registry.register(ArchiveExtractSkill()) registry.register(FileInventorySkill()) registry.register(DocumentPageCountSkill()) registry.register(ProductDetectSkill()) registry.register(SummaryReportSkill()) return registry def build_batch_no() -> str: return f"FS-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" @transaction.atomic def create_file_summary_batch( *, conversation: Conversation, user, trigger_message: Message | None = None, ) -> FileSummaryBatch: active_attachments = list( FileAttachment.objects.select_for_update() .filter(conversation=conversation, is_active=True) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .order_by("original_name", "-created_at") ) if not active_attachments: raise ValueError("当前对话没有可用附件。") logger.info( "File summary batch creation started", extra={ "conversation_id": conversation.pk, "user_id": user.pk, "attachment_ids": [attachment.pk for attachment in active_attachments], }, ) batch = FileSummaryBatch.objects.create( conversation=conversation, user=user, trigger_message=trigger_message, batch_no=build_batch_no(), ) for attachment in active_attachments: FileSummaryBatchAttachment.objects.create(batch=batch, attachment=attachment) attachment.upload_status = FileAttachment.UploadStatus.BOUND attachment.save(update_fields=["upload_status"]) for code, name, _skill_name in NODE_DEFINITIONS: WorkflowNodeRun.objects.create(batch=batch, node_code=code, node_name=name) record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) logger.info( "File summary batch created", extra={"batch_id": batch.pk, "batch_no": batch.batch_no}, ) return batch class WorkflowExecutor: def __init__(self, batch: FileSummaryBatch, registry: SkillRegistry | None = None): self.batch = batch self.registry = registry or default_skill_registry() def run(self) -> None: logger.info("Workflow run started", extra={"batch_id": self.batch.pk}) self.batch.status = FileSummaryBatch.Status.RUNNING self.batch.started_at = timezone.now() self.batch.save(update_fields=["status", "started_at"]) record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) try: for node in self.batch.node_runs.order_by("id"): self._run_node(node) except Exception as exc: logger.exception( "Workflow run failed", extra={"batch_id": self.batch.pk, "error": str(exc)}, ) self.batch.status = FileSummaryBatch.Status.FAILED self.batch.error_message = str(exc) self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "error_message", "finished_at"]) record_event(self.batch, "workflow_failed", {"message": str(exc)}) return self.batch.status = FileSummaryBatch.Status.SUCCESS self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) logger.info("Workflow run completed", extra={"batch_id": self.batch.pk}) def _run_node(self, node: WorkflowNodeRun) -> None: logger.info( "Workflow node started", extra={ "batch_id": self.batch.pk, "node_code": node.node_code, "node_name": node.node_name, }, ) now = timezone.now() node.status = WorkflowNodeRun.Status.RUNNING node.progress = 10 node.started_at = now node.message = f"{node.node_name}处理中" node.save(update_fields=["status", "progress", "started_at", "message"]) record_event( self.batch, "node_progress", {"node_code": node.node_code, "status": node.status, "progress": node.progress}, ) skill_name = next( (skill for code, _name, skill in NODE_DEFINITIONS if code == node.node_code), "", ) if skill_name: result = self.registry.execute(skill_name, WorkflowContext(batch=self.batch)) if not result.success: logger.warning( "Workflow node skill failed", extra={ "batch_id": self.batch.pk, "node_code": node.node_code, "skill_name": skill_name, "result_message": result.message, }, ) raise RuntimeError(result.message or f"{node.node_name}执行失败") node.status = WorkflowNodeRun.Status.SUCCESS node.progress = 100 node.finished_at = timezone.now() node.message = f"{node.node_name}完成" node.save(update_fields=["status", "progress", "finished_at", "message"]) record_event( self.batch, "node_progress", {"node_code": node.node_code, "status": node.status, "progress": node.progress}, ) logger.info( "Workflow node finished", extra={"batch_id": self.batch.pk, "node_code": node.node_code}, ) def start_file_summary_workflow(batch: FileSummaryBatch, *, async_run: bool = True) -> None: executor = WorkflowExecutor(batch) if not async_run: logger.info("Workflow starting synchronously", extra={"batch_id": batch.pk}) executor.run() return logger.info("Workflow starting asynchronously", extra={"batch_id": batch.pk}) Thread(target=executor.run, daemon=True).start()