diff --git a/review_agent/application_form_fill/events.py b/review_agent/application_form_fill/events.py new file mode 100644 index 0000000..be7ec28 --- /dev/null +++ b/review_agent/application_form_fill/events.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.models import ApplicationFormFillBatch, WorkflowEvent + + +def record_event( + batch: ApplicationFormFillBatch, + event_type: str, + payload: dict | None = None, +) -> WorkflowEvent: + return WorkflowEvent.objects.create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + conversation=batch.conversation, + event_type=event_type, + payload=payload or {}, + ) + + +def serialize_event(event: WorkflowEvent) -> dict[str, object]: + return { + "id": event.pk, + "event_type": event.event_type, + "payload": event.payload, + "created_at": event.created_at.isoformat(), + } diff --git a/review_agent/application_form_fill/workflow.py b/review_agent/application_form_fill/workflow.py index 78ec271..cb29e6e 100644 --- a/review_agent/application_form_fill/workflow.py +++ b/review_agent/application_form_fill/workflow.py @@ -1,21 +1,151 @@ from __future__ import annotations -from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS, WORKFLOW_TYPE +import logging +from threading import Thread +from uuid import uuid4 + +from django.conf import settings +from django.db import transaction +from django.utils import timezone + +from review_agent.application_form_fill.constants import DEFAULT_OUTPUT_TYPES, FORM_FILL_NODE_DEFINITIONS, WORKFLOW_TYPE +from review_agent.application_form_fill.events import record_event +from review_agent.application_form_fill.storage import build_batch_work_dir +from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, Message, WorkflowNodeRun + + +logger = logging.getLogger("review_agent.application_form_fill.workflow") + + +def build_batch_no() -> str: + return f"AFF-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" + + +def find_latest_successful_summary_batch(conversation: Conversation) -> FileSummaryBatch | None: + return ( + FileSummaryBatch.objects.filter( + conversation=conversation, + status=FileSummaryBatch.Status.SUCCESS, + ) + .order_by("-finished_at", "-created_at", "-id") + .first() + ) + + +@transaction.atomic +def create_application_form_fill_batch( + *, + conversation: Conversation, + user, + source_summary_batch: FileSummaryBatch, + trigger_message: Message | None = None, + requested_templates: list[str] | None = None, + output_types: list[str] | None = None, +) -> ApplicationFormFillBatch: + batch_no = build_batch_no() + work_dir = build_batch_work_dir(batch_no=batch_no) + work_dir.mkdir(parents=True, exist_ok=True) + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger_message, + source_summary_batch=source_summary_batch, + batch_no=batch_no, + requested_templates=requested_templates or [], + output_types=output_types or DEFAULT_OUTPUT_TYPES, + work_dir=str(work_dir), + ) + for code, name, group in FORM_FILL_NODE_DEFINITIONS: + WorkflowNodeRun.objects.create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + node_group=group, + node_code=code, + node_name=name, + ) + record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) + return batch class FormFillWorkflowExecutor: - """Workflow executor scaffold filled in by later AFF stages.""" + """Runs the auto-fill workflow skeleton; later stages fill node bodies.""" - def __init__(self, batch): + def __init__(self, batch: ApplicationFormFillBatch): self.batch = batch def run(self) -> None: - raise NotImplementedError("application_form_fill workflow is implemented in later AFF stages.") + logger.info("自动填表工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk) + self.batch.status = ApplicationFormFillBatch.Status.RUNNING + self.batch.started_at = timezone.now() + self.batch.save(update_fields=["status", "started_at"]) + record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) + + try: + for node in self._nodes(): + if node.status in {WorkflowNodeRun.Status.SUCCESS, WorkflowNodeRun.Status.SKIPPED}: + continue + self._run_node(node) + except Exception as exc: + logger.exception("Application form fill workflow failed", extra={"batch_id": self.batch.pk}) + self.batch.status = ApplicationFormFillBatch.Status.FAILED + self.batch.error_message = str(exc) + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "error_message", "finished_at"]) + record_event(self.batch, "workflow_failed", {"message": str(exc)}) + return + + self.batch.status = ApplicationFormFillBatch.Status.SUCCESS + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "finished_at"]) + record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + logger.info("自动填表工作流完成 batch_no=%s", self.batch.batch_no) + + def _nodes(self): + return WorkflowNodeRun.objects.filter( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=self.batch.pk, + ).order_by("id") + + def _run_node(self, node: WorkflowNodeRun) -> None: + node.status = WorkflowNodeRun.Status.RUNNING + node.progress = 10 + node.started_at = timezone.now() + node.message = f"{node.node_name}处理中" + node.save(update_fields=["status", "progress", "started_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + + if node.node_code == "pdf_convert": + node.status = WorkflowNodeRun.Status.SKIPPED + node.progress = 100 + node.finished_at = timezone.now() + node.message = "PDF 转换为后续增强项,本次跳过" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + return + + node.status = WorkflowNodeRun.Status.SUCCESS + node.progress = 100 + node.finished_at = timezone.now() + node.message = f"{node.node_name}完成" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) -def start_application_form_fill_workflow(batch, *, async_run: bool = True) -> None: +def start_application_form_fill_workflow(batch: ApplicationFormFillBatch, *, async_run: bool = True) -> None: executor = FormFillWorkflowExecutor(batch) - if async_run: + if not async_run: executor.run() return - executor.run() + Thread(target=executor.run, daemon=True).start() diff --git a/review_agent/services.py b/review_agent/services.py index de72857..252502a 100644 --- a/review_agent/services.py +++ b/review_agent/services.py @@ -11,6 +11,11 @@ from .file_summary.skills.attachment_reader import AttachmentReaderSkill from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .models import Conversation, FileAttachment, FileSummaryBatch, Message +from .application_form_fill.workflow import ( + create_application_form_fill_batch, + find_latest_successful_summary_batch as find_latest_successful_form_fill_summary_batch, + start_application_form_fill_workflow, +) from .regulatory_review.workflow import ( create_regulatory_review_batch, find_latest_successful_summary_batch, @@ -224,6 +229,85 @@ def stream_message(conversation: Conversation, content: str): ) return + if route.starts_application_form_fill: + source_summary_batch = find_latest_successful_form_fill_summary_batch(conversation) + if not source_summary_batch: + if not _has_active_attachments(conversation): + reply_content = "请先在当前对话右侧上传需要填表的产品资料或压缩包,我会先自动汇总再继续生成申报模板。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + summary_batch = create_file_summary_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + ) + yield sse_event( + "workflow_started", + { + "workflow_type": "file_summary", + "batch_id": summary_batch.pk, + "batch_no": summary_batch.batch_no, + }, + ) + start_file_summary_workflow(summary_batch, async_run=False) + summary_batch.refresh_from_db() + if summary_batch.status != FileSummaryBatch.Status.SUCCESS: + reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动申报文件自动填表。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + source_summary_batch = summary_batch + reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续自动填表。\n" + else: + reply_prefix = "" + batch = create_application_form_fill_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + source_summary_batch=source_summary_batch, + ) + start_application_form_fill_workflow( + batch, + async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True), + ) + reply_content = f"{reply_prefix}已启动申报文件自动填表工作流,批次号:{batch.batch_no}。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event( + "workflow_started", + { + "workflow_type": "application_form_fill", + "batch_id": batch.pk, + "batch_no": batch.batch_no, + }, + ) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + if route.starts_regulatory_review: source_summary_batch = find_latest_successful_summary_batch(conversation) if not source_summary_batch: diff --git a/review_agent/skill_router.py b/review_agent/skill_router.py index 05718e4..b0b5323 100644 --- a/review_agent/skill_router.py +++ b/review_agent/skill_router.py @@ -8,6 +8,7 @@ from .file_summary.workflow_trigger import ( evaluate_attachment_reader_trigger, evaluate_file_summary_trigger, ) +from .application_form_fill.constants import FORM_FILL_TRIGGER_KEYWORDS, WORKFLOW_TYPE as FORM_FILL_WORKFLOW_TYPE from .llm import LLMConfigurationError, LLMRequestError, generate_completion from .models import Conversation, FileAttachment @@ -16,6 +17,7 @@ logger = logging.getLogger(__name__) ROUTE_ACTIONS = {"normal_chat", "attachment_reader", "file_summary"} ROUTE_ACTIONS.add("regulatory_review") +ROUTE_ACTIONS.add(FORM_FILL_WORKFLOW_TYPE) @dataclass(frozen=True) @@ -39,6 +41,10 @@ class SkillRoute: def starts_regulatory_review(self) -> bool: return self.action == "regulatory_review" + @property + def starts_application_form_fill(self) -> bool: + return self.action == FORM_FILL_WORKFLOW_TYPE + @property def is_normal_chat(self) -> bool: return self.action == "normal_chat" @@ -105,7 +111,7 @@ def _route_with_llm( return SkillRoute( action=action, skill_name="attachment_reader" if action == "attachment_reader" else "", - workflow_type=action if action in {"file_summary", "regulatory_review"} else "", + workflow_type=action if action in {"file_summary", "regulatory_review", FORM_FILL_WORKFLOW_TYPE} else "", confidence=_float_or_zero(payload.get("confidence")), reason=str(payload.get("reason") or ""), source="llm", @@ -113,6 +119,15 @@ def _route_with_llm( def _route_with_rules(conversation: Conversation, content: str) -> SkillRoute: + if _matches_application_form_fill(content): + return SkillRoute( + action=FORM_FILL_WORKFLOW_TYPE, + workflow_type=FORM_FILL_WORKFLOW_TYPE, + confidence=0.7, + reason="命中申报文件自动填表关键词。", + source="rule_fallback", + ) + if _matches_regulatory_review(content): return SkillRoute( action="regulatory_review", @@ -162,10 +177,11 @@ def _router_system_prompt() -> str: return ( "你是审核智能体的工具路由器,只判断是否需要调用工具,不直接回答用户。" "你必须只输出 JSON 对象,不要输出 Markdown。" - "可选 action:normal_chat、attachment_reader、file_summary、regulatory_review。" + "可选 action:normal_chat、attachment_reader、file_summary、regulatory_review、application_form_fill。" "attachment_reader 用于用户要求阅读、提取、分析、总结、查看上传附件内容。" "file_summary 用于用户要求自动汇总文件目录、页数、清单或生成目录页数报告。" "regulatory_review 用于用户要求法规核查、NMPA核查、完整性核查、章节一致性核查、风险预警或整改建议。" + "application_form_fill 用于用户要求填注册证、生成申报模板、填写对应表格、安全和性能基本原则清单或自动填表。" "normal_chat 用于不需要读取附件或执行工作流的一般问答。" "输出字段:action、confidence、reason。" ) @@ -217,3 +233,8 @@ def _matches_regulatory_review(content: str) -> bool: "一致性核查", ] return any(keyword in normalized for keyword in keywords) + + +def _matches_application_form_fill(content: str) -> bool: + normalized = content.lower() + return any(keyword.lower() in normalized for keyword in FORM_FILL_TRIGGER_KEYWORDS) diff --git a/tests/test_application_form_fill_trigger.py b/tests/test_application_form_fill_trigger.py new file mode 100644 index 0000000..8272f29 --- /dev/null +++ b/tests/test_application_form_fill_trigger.py @@ -0,0 +1,45 @@ +import pytest + +from review_agent.models import Conversation +from review_agent.skill_router import route_message_intent + + +pytestmark = pytest.mark.django_db + + +@pytest.mark.parametrize( + "content", + [ + "帮我填注册证", + "给我这个内容对应的表格", + "为我该方案生成申报模板", + "请自动填表并生成表格", + "生成安全和性能基本原则清单", + ], +) +def test_rule_router_starts_application_form_fill_for_keywords(monkeypatch, django_user_model, content): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.skill_router._route_with_llm", + lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), + ) + + route = route_message_intent(conversation, content) + + assert route.action == "application_form_fill" + assert route.workflow_type == "application_form_fill" + assert route.starts_application_form_fill + + +def test_rule_router_does_not_misroute_normal_chat(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.skill_router._route_with_llm", + lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), + ) + + route = route_message_intent(conversation, "你好,解释一下法规背景") + + assert route.action == "normal_chat" diff --git a/tests/test_application_form_fill_workflow.py b/tests/test_application_form_fill_workflow.py new file mode 100644 index 0000000..4003534 --- /dev/null +++ b/tests/test_application_form_fill_workflow.py @@ -0,0 +1,195 @@ +import pytest + +from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS +from review_agent.application_form_fill.workflow import ( + create_application_form_fill_batch, + find_latest_successful_summary_batch, + start_application_form_fill_workflow, +) +from review_agent.models import ( + ApplicationFormFillBatch, + Conversation, + FileAttachment, + FileSummaryBatch, + Message, + WorkflowEvent, + WorkflowNodeRun, +) +from review_agent.services import stream_message +from review_agent.skill_router import SkillRoute + + +pytestmark = pytest.mark.django_db + + +def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + success = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-FAILED", + status=FileSummaryBatch.Status.FAILED, + ) + + assert find_latest_successful_summary_batch(conversation) == success + + +def test_create_application_form_fill_batch_initializes_nodes(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=message, + source_summary_batch=summary, + ) + + assert batch.status == ApplicationFormFillBatch.Status.PENDING + assert batch.output_types == ["word", "excel", "json"] + assert WorkflowNodeRun.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).count() == len(FORM_FILL_NODE_DEFINITIONS) + assert WorkflowEvent.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + event_type="workflow_created", + ).exists() + + +def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.SUCCESS + assert WorkflowNodeRun.objects.get( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + node_code="pdf_convert", + ).status == WorkflowNodeRun.Status.SKIPPED + assert WorkflowEvent.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + event_type="workflow_completed", + ).exists() + + +def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "帮我填注册证")) + + joined = "".join(frames) + assert "请先在当前对话右侧上传需要填表的产品资料或压缩包" in joined + assert not ApplicationFormFillBatch.objects.exists() + + +def test_stream_message_starts_application_form_fill_workflow(monkeypatch, settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "帮我填注册证")) + + joined = "".join(frames) + assert "workflow_started" in joined + assert '"workflow_type": "application_form_fill"' in joined + assert "已启动申报文件自动填表工作流" in joined + assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists() + + +def test_stream_message_auto_runs_summary_before_application_form_fill( + monkeypatch, settings, tmp_path, django_user_model +): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + attachment_path = tmp_path / "application.txt" + attachment_path.write_text("产品名称:甲胎蛋白检测试剂盒", encoding="utf-8") + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="application.txt", + storage_path=str(attachment_path), + file_size=attachment_path.stat().st_size, + is_active=True, + ) + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + def finish_summary(batch, async_run=True): + batch.status = FileSummaryBatch.Status.SUCCESS + batch.save(update_fields=["status"]) + + monkeypatch.setattr("review_agent.services.start_file_summary_workflow", finish_summary) + + frames = list(stream_message(conversation, "为我该方案生成申报模板")) + joined = "".join(frames) + + assert '"workflow_type": "file_summary"' in joined + assert '"workflow_type": "application_form_fill"' in joined + assert "汇总完成后继续自动填表" in joined + assert FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS).exists() + assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()