From 9be10ef9905b74937a3a200af2339e7c8c36bc37 Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 18:40:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(application-form-fill):=20=E4=B8=B2?= =?UTF-8?q?=E8=81=94=E5=A1=AB=E8=A1=A8=E5=B7=A5=E4=BD=9C=E6=B5=81=E4=BA=A7?= =?UTF-8?q?=E7=89=A9=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../services/notifier.py | 45 +++++ .../application_form_fill/services/summary.py | 35 ++++ .../application_form_fill/workflow.py | 179 +++++++++++++++++- ...test_application_form_fill_notification.py | 61 ++++++ tests/test_application_form_fill_workflow.py | 77 ++++++++ 5 files changed, 396 insertions(+), 1 deletion(-) create mode 100644 review_agent/application_form_fill/services/notifier.py create mode 100644 review_agent/application_form_fill/services/summary.py create mode 100644 tests/test_application_form_fill_notification.py diff --git a/review_agent/application_form_fill/services/notifier.py b/review_agent/application_form_fill/services/notifier.py new file mode 100644 index 0000000..c3c2969 --- /dev/null +++ b/review_agent/application_form_fill/services/notifier.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from django.utils import timezone + +from review_agent.models import ( + ApplicationFormFillBatch, + ApplicationFormFillNotificationRecord, + ExportedSummaryFile, +) + + +def notify_completion( + batch: ApplicationFormFillBatch, + exports: list[ExportedSummaryFile], + *, + fail: bool = False, +) -> ApplicationFormFillNotificationRecord: + export_ids = [export.pk for export in exports] + message_summary = ( + f"自动填表批次 {batch.batch_no} 已完成," + f"模板 {', '.join(batch.selected_templates or []) or '未识别'}," + f"冲突字段 {len(batch.conflict_summary or [])} 个。" + ) + if fail: + return ApplicationFormFillNotificationRecord.objects.create( + batch=batch, + recipient=batch.user, + channel=ApplicationFormFillNotificationRecord.Channel.MOCK, + template_codes=batch.selected_templates, + export_ids=export_ids, + message_summary=message_summary, + send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED, + retry_count=1, + error_message="mock notification failed", + ) + return ApplicationFormFillNotificationRecord.objects.create( + batch=batch, + recipient=batch.user, + channel=ApplicationFormFillNotificationRecord.Channel.MOCK, + template_codes=batch.selected_templates, + export_ids=export_ids, + message_summary=message_summary, + send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS, + sent_at=timezone.now(), + ) diff --git a/review_agent/application_form_fill/services/summary.py b/review_agent/application_form_fill/services/summary.py new file mode 100644 index 0000000..7501d7b --- /dev/null +++ b/review_agent/application_form_fill/services/summary.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile + + +def build_assistant_summary(batch: ApplicationFormFillBatch, exports: list[ExportedSummaryFile]) -> str: + word_exports = [export for export in exports if export.export_type == ExportedSummaryFile.ExportType.WORD] + trace_exports = [ + export + for export in exports + if export.export_type in {ExportedSummaryFile.ExportType.EXCEL, ExportedSummaryFile.ExportType.JSON} + ] + lines = ["已生成申报模板自动填表文件。", "", "| 文件 | Word | PDF |", "| --- | --- | --- |"] + if word_exports: + for export in word_exports: + lines.append(f"| {export.file_name} | [下载](/api/review-agent/file-summary/exports/{export.pk}/download/) | 待增强 |") + else: + lines.append("| 自动填表结果 | 未生成 | 待增强 |") + + conflicts = batch.conflict_summary or [] + if conflicts: + lines.extend(["", "| 冲突字段 | 采用值 | 冲突来源 | 处理 |", "| --- | --- | --- | --- |"]) + for item in conflicts: + conflict_sources = ";".join( + f"{value.get('source_file', '')}:{value.get('value', '')}" for value in item.get("conflict_values", []) + ) + lines.append( + f"| {item.get('field_label', item.get('field_key', ''))} | {item.get('selected_value', '')} | {conflict_sources or '-'} | {item.get('handling', '')} |" + ) + + if trace_exports: + lines.append("") + for export in trace_exports: + lines.append(f"[下载{export.file_name}](/api/review-agent/file-summary/exports/{export.pk}/download/)") + return "\n".join(lines).strip() diff --git a/review_agent/application_form_fill/workflow.py b/review_agent/application_form_fill/workflow.py index cb29e6e..57699d3 100644 --- a/review_agent/application_form_fill/workflow.py +++ b/review_agent/application_form_fill/workflow.py @@ -10,6 +10,31 @@ from django.utils import timezone from review_agent.application_form_fill.constants import DEFAULT_OUTPUT_TYPES, FORM_FILL_NODE_DEFINITIONS, WORKFLOW_TYPE from review_agent.application_form_fill.events import record_event +from review_agent.application_form_fill.services.field_extract import ( + collect_document_texts, + run_parallel_extract, + save_field_extract_result, +) +from review_agent.application_form_fill.services.field_merge import merge_fields +from review_agent.application_form_fill.services.notifier import notify_completion +from review_agent.application_form_fill.services.summary import build_assistant_summary +from review_agent.application_form_fill.services.template_config import ( + compute_config_hash, + load_template_config, + validate_template_config, +) +from review_agent.application_form_fill.services.template_repository import ( + TemplateUnavailableError, + copy_template_to_batch, +) +from review_agent.application_form_fill.services.template_select import ( + detect_registration_type, + parse_requested_templates, + select_templates, +) +from review_agent.application_form_fill.services.traceability_export import save_traceability_exports +from review_agent.application_form_fill.services.word_fill import create_word_export +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec from review_agent.application_form_fill.storage import build_batch_work_dir from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, Message, WorkflowNodeRun @@ -72,6 +97,16 @@ class FormFillWorkflowExecutor: def __init__(self, batch: ApplicationFormFillBatch): self.batch = batch + self.template_config: dict = {} + self.selected_templates: list[TemplateSpec] = [] + self.template_paths: dict[str, str] = {} + self.document_texts: dict[str, str] = {} + self.extract_payload: dict = {} + self.merged_fields: dict[str, MergedField] = {} + self.conflicts: list[dict] = [] + self.exports = [] + self.generation_results: list[dict] = [] + self.non_blocking_errors: list[str] = [] def run(self) -> None: logger.info("自动填表工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk) @@ -94,7 +129,9 @@ class FormFillWorkflowExecutor: record_event(self.batch, "workflow_failed", {"message": str(exc)}) return - self.batch.status = ApplicationFormFillBatch.Status.SUCCESS + self.batch.refresh_from_db() + if self.batch.status != ApplicationFormFillBatch.Status.PARTIAL_SUCCESS: + self.batch.status = ApplicationFormFillBatch.Status.SUCCESS self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) @@ -119,6 +156,12 @@ class FormFillWorkflowExecutor: ) if node.node_code == "pdf_convert": + self._append_risk_note( + { + "type": "pdf_pending", + "message": "PDF 转换为后续增强项,本次优先生成 Word。", + } + ) node.status = WorkflowNodeRun.Status.SKIPPED node.progress = 100 node.finished_at = timezone.now() @@ -131,6 +174,8 @@ class FormFillWorkflowExecutor: ) return + self._execute_node(node) + node.status = WorkflowNodeRun.Status.SUCCESS node.progress = 100 node.finished_at = timezone.now() @@ -142,6 +187,138 @@ class FormFillWorkflowExecutor: {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, ) + def _execute_node(self, node: WorkflowNodeRun) -> None: + if node.node_code == "prepare": + if self.batch.source_summary_batch.status != FileSummaryBatch.Status.SUCCESS: + raise ValueError("自动填表需要成功的文件汇总批次。") + return + if node.node_code == "template_select": + self.template_config = load_template_config() + errors = validate_template_config(self.template_config) + if errors: + raise ValueError(";".join(errors)) + requested = parse_requested_templates(self.batch.trigger_message.content if self.batch.trigger_message else "") + registration_type, source = detect_registration_type(batch=self.batch, message=self.batch.trigger_message.content if self.batch.trigger_message else "") + specs, risk_notes = select_templates(self.template_config, requested, registration_type) + if not specs: + raise ValueError("未选择到可用申报模板。") + self.selected_templates = specs + self.batch.requested_templates = requested + self.batch.selected_templates = [spec.code for spec in specs] + self.batch.registration_type = registration_type + self.batch.registration_type_source = source + self.batch.template_config_version = str(self.template_config.get("version") or "") + self.batch.template_config_hash = compute_config_hash() + self.batch.risk_notes = list(self.batch.risk_notes or []) + risk_notes + self.batch.save( + update_fields=[ + "requested_templates", + "selected_templates", + "registration_type", + "registration_type_source", + "template_config_version", + "template_config_hash", + "risk_notes", + ] + ) + return + if node.node_code == "template_copy": + for spec in self.selected_templates: + try: + artifact = copy_template_to_batch(spec, self.batch, self.template_config) + self.template_paths[spec.code] = artifact.storage_path + except TemplateUnavailableError as exc: + self.non_blocking_errors.append(str(exc)) + self._append_risk_note({"type": "template_unavailable", "message": str(exc), "template_code": spec.code}) + if not self.template_paths: + raise ValueError("没有可用的 Word 模板副本。") + return + if node.node_code == "field_extract": + self.document_texts = collect_document_texts(self.batch.source_summary_batch) + self.extract_payload = run_parallel_extract(self.document_texts, self.selected_templates) + save_field_extract_result(self.batch, self.extract_payload) + return + if node.node_code == "conflict_merge": + self.merged_fields, self.conflicts = merge_fields( + self.extract_payload.get("regex_results") or {}, + self.extract_payload.get("llm_results") or {}, + ) + product = self.merged_fields.get("product_name") + if product and product.value: + self.batch.product_name = product.value + self.batch.conflict_summary = self.conflicts + self.batch.save(update_fields=["product_name", "conflict_summary"]) + return + if node.node_code == "word_fill": + for spec in self.selected_templates: + template_path = self.template_paths.get(spec.code) + if not template_path: + self.generation_results.append( + { + "template_code": spec.code, + "template_label": spec.output_label, + "word_status": "failed", + "pdf_status": "待增强", + "error_message": "模板不可用", + } + ) + continue + export = create_word_export(self.batch, spec, template_path, self.merged_fields, self.conflicts) + self.exports.append(export) + self.generation_results.append( + { + "template_code": spec.code, + "template_label": spec.output_label, + "word_status": "success", + "pdf_status": "待增强", + "error_message": "", + } + ) + if not any(item["word_status"] == "success" for item in self.generation_results): + raise ValueError("所有目标 Word 模板均生成失败。") + return + if node.node_code == "trace_export": + self.exports.extend( + save_traceability_exports( + self.batch, + self.merged_fields, + self.conflicts, + self.selected_templates, + self.generation_results, + ) + ) + return + if node.node_code == "output_export": + Message.objects.create( + conversation=self.batch.conversation, + role=Message.Role.ASSISTANT, + content=build_assistant_summary(self.batch, self.exports), + ) + return + if node.node_code == "notify": + notification = notify_completion( + self.batch, + self.exports, + fail=getattr(settings, "APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL", False), + ) + if notification.send_status == notification.SendStatus.FAILED: + self.non_blocking_errors.append(notification.error_message or "通知失败") + return + if node.node_code == "completed": + self._mark_final_status() + + def _mark_final_status(self) -> None: + failed_word = any(item.get("word_status") == "failed" for item in self.generation_results) + if self.non_blocking_errors or failed_word: + self.batch.status = ApplicationFormFillBatch.Status.PARTIAL_SUCCESS + else: + self.batch.status = ApplicationFormFillBatch.Status.SUCCESS + self.batch.save(update_fields=["status"]) + + def _append_risk_note(self, note: dict) -> None: + self.batch.risk_notes = list(self.batch.risk_notes or []) + [note] + self.batch.save(update_fields=["risk_notes"]) + def start_application_form_fill_workflow(batch: ApplicationFormFillBatch, *, async_run: bool = True) -> None: executor = FormFillWorkflowExecutor(batch) diff --git a/tests/test_application_form_fill_notification.py b/tests/test_application_form_fill_notification.py new file mode 100644 index 0000000..9905689 --- /dev/null +++ b/tests/test_application_form_fill_notification.py @@ -0,0 +1,61 @@ +import pytest + +from review_agent.application_form_fill.services.notifier import notify_completion +from review_agent.models import ( + ApplicationFormFillBatch, + ApplicationFormFillNotificationRecord, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_notify_completion_records_success(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-NOTIFY", + selected_templates=["registration_certificate"], + ) + exported = ExportedSummaryFile.objects.create( + batch=summary, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path="filled.docx", + ) + + record = notify_completion(batch, [exported]) + + assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.SUCCESS + assert record.export_ids == [exported.pk] + assert record.template_codes == ["registration_certificate"] + assert record.sent_at is not None + + +def test_notify_completion_records_failure_without_raising(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY-FAIL") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-NOTIFY-FAIL", + selected_templates=["registration_certificate"], + ) + + record = notify_completion(batch, [], fail=True) + + assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED + assert record.retry_count == 1 + assert "mock" in record.error_message diff --git a/tests/test_application_form_fill_workflow.py b/tests/test_application_form_fill_workflow.py index 4003534..abfe369 100644 --- a/tests/test_application_form_fill_workflow.py +++ b/tests/test_application_form_fill_workflow.py @@ -22,6 +22,14 @@ from review_agent.skill_router import SkillRoute pytestmark = pytest.mark.django_db +@pytest.fixture(autouse=True) +def stub_aff_llm_extract(monkeypatch): + monkeypatch.setattr( + "review_agent.application_form_fill.services.field_extract.generate_completion", + lambda messages, temperature=0.0: '{"fields": [], "checklist_items": []}', + ) + + def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") @@ -83,9 +91,11 @@ def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_p batch_no="FS-AFF-OK", status=FileSummaryBatch.Status.SUCCESS, ) + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") batch = create_application_form_fill_batch( conversation=conversation, user=user, + trigger_message=trigger, source_summary_batch=summary, ) @@ -105,6 +115,73 @@ def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_p ).exists() +def test_application_form_fill_workflow_generates_summary_and_exports(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-FULL", + status=FileSummaryBatch.Status.SUCCESS, + ) + source = tmp_path / "ifu.txt" + source.write_text("产品名称:甲胎蛋白检测试剂盒\n预期用途:用于体外检测", encoding="utf-8") + from review_agent.models import FileSummaryItem + + FileSummaryItem.objects.create( + batch=summary, + file_index=1, + file_name="说明书.txt", + file_type="txt", + relative_path="说明书.txt", + storage_path=str(source), + ) + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.SUCCESS + assert batch.product_name == "甲胎蛋白检测试剂盒" + assert batch.selected_templates == ["registration_certificate"] + assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已生成申报模板自动填表文件").exists() + assert batch.notifications.filter(send_status="success").exists() + + +def test_application_form_fill_status_becomes_partial_when_notification_fails(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL = True + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-PARTIAL", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS + assert batch.notifications.filter(send_status="failed").exists() + + def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话")