from __future__ import annotations from threading import Thread from uuid import uuid4 from django.db import transaction from django.utils import timezone from review_agent.models import ( Conversation, FileAttachment, FileSummaryBatch, FileSummaryBatchAttachment, Message, WorkflowNodeRun, ) from .events import record_event NODE_DEFINITIONS = [ ("upload", "附件固化"), ("extract", "压缩包解压"), ("inventory", "文件扫描"), ("page_count", "页数统计"), ("product_detect", "产品识别"), ("report", "报告输出"), ("complete", "完成"), ] def build_batch_no() -> str: return f"FS-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" @transaction.atomic def create_file_summary_batch( *, conversation: Conversation, user, trigger_message: Message | None = None, ) -> FileSummaryBatch: active_attachments = list( FileAttachment.objects.select_for_update() .filter(conversation=conversation, is_active=True) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .order_by("original_name", "-created_at") ) if not active_attachments: raise ValueError("当前对话没有可用附件。") batch = FileSummaryBatch.objects.create( conversation=conversation, user=user, trigger_message=trigger_message, batch_no=build_batch_no(), ) for attachment in active_attachments: FileSummaryBatchAttachment.objects.create(batch=batch, attachment=attachment) attachment.upload_status = FileAttachment.UploadStatus.BOUND attachment.save(update_fields=["upload_status"]) for code, name in NODE_DEFINITIONS: WorkflowNodeRun.objects.create(batch=batch, node_code=code, node_name=name) record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) return batch class WorkflowExecutor: def __init__(self, batch: FileSummaryBatch): self.batch = batch def run(self) -> None: self.batch.status = FileSummaryBatch.Status.RUNNING self.batch.started_at = timezone.now() self.batch.save(update_fields=["status", "started_at"]) record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) try: for node in self.batch.node_runs.order_by("id"): self._run_node(node) except Exception as exc: self.batch.status = FileSummaryBatch.Status.FAILED self.batch.error_message = str(exc) self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "error_message", "finished_at"]) record_event(self.batch, "workflow_failed", {"message": str(exc)}) return self.batch.status = FileSummaryBatch.Status.SUCCESS self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) def _run_node(self, node: WorkflowNodeRun) -> None: now = timezone.now() node.status = WorkflowNodeRun.Status.RUNNING node.progress = 10 node.started_at = now node.message = f"{node.node_name}处理中" node.save(update_fields=["status", "progress", "started_at", "message"]) record_event( self.batch, "node_progress", {"node_code": node.node_code, "status": node.status, "progress": node.progress}, ) node.status = WorkflowNodeRun.Status.SUCCESS node.progress = 100 node.finished_at = timezone.now() node.message = f"{node.node_name}完成" node.save(update_fields=["status", "progress", "finished_at", "message"]) record_event( self.batch, "node_progress", {"node_code": node.node_code, "status": node.status, "progress": node.progress}, ) def start_file_summary_workflow(batch: FileSummaryBatch, *, async_run: bool = True) -> None: executor = WorkflowExecutor(batch) if not async_run: executor.run() return Thread(target=executor.run, daemon=True).start()