diff --git a/review_agent/application_form_fill/services/traceability_export.py b/review_agent/application_form_fill/services/traceability_export.py new file mode 100644 index 0000000..4be7934 --- /dev/null +++ b/review_agent/application_form_fill/services/traceability_export.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import json +from dataclasses import asdict +from pathlib import Path +from typing import Any + +from openpyxl import Workbook + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile + + +def build_traceability_workbook( + batch: ApplicationFormFillBatch, + merged_fields: dict[str, MergedField], + conflicts: list[dict[str, Any]], + specs: list[TemplateSpec], + generation_results: list[dict[str, Any]] | None = None, +) -> Workbook: + workbook = Workbook() + field_sheet = workbook.active + field_sheet.title = "字段追溯" + field_sheet.append(["模板", "字段", "填入值", "来源文件", "证据", "冲突状态"]) + template_names = {field.get("key"): spec.output_label for spec in specs for field in spec.fields} + for key, field in merged_fields.items(): + field_sheet.append( + [ + template_names.get(key, ""), + field.label, + field.value, + field.source_file, + field.evidence, + "冲突" if field.has_conflict else "一致", + ] + ) + + conflict_sheet = workbook.create_sheet("冲突字段") + conflict_sheet.append(["字段", "采用值", "冲突值", "冲突来源", "处理方式"]) + for conflict in conflicts: + conflict_values = conflict.get("conflict_values") or [] + if not conflict_values: + conflict_sheet.append( + [ + conflict.get("field_label", ""), + conflict.get("selected_value", ""), + "", + "", + conflict.get("handling", ""), + ] + ) + continue + for value in conflict_values: + conflict_sheet.append( + [ + conflict.get("field_label", ""), + conflict.get("selected_value", ""), + value.get("value", ""), + value.get("source_file", ""), + conflict.get("handling", ""), + ] + ) + + low_confidence_sheet = workbook.create_sheet("低置信度条目") + low_confidence_sheet.append(["字段", "填入值", "置信度", "来源文件"]) + for field in merged_fields.values(): + if field.confidence < 0.6: + low_confidence_sheet.append([field.label, field.value, field.confidence, field.source_file]) + + result_sheet = workbook.create_sheet("生成结果") + result_sheet.append(["模板", "Word状态", "PDF状态", "错误说明"]) + for result in generation_results or []: + result_sheet.append( + [ + result.get("template_label", ""), + result.get("word_status", ""), + result.get("pdf_status", "待增强"), + result.get("error_message", ""), + ] + ) + if not generation_results: + for spec in specs: + result_sheet.append([spec.output_label, "待生成", "待增强", ""]) + return workbook + + +def save_traceability_exports( + batch: ApplicationFormFillBatch, + merged_fields: dict[str, MergedField], + conflicts: list[dict[str, Any]], + specs: list[TemplateSpec], + generation_results: list[dict[str, Any]] | None = None, +) -> list[ExportedSummaryFile]: + target_dir = ensure_batch_subdir(batch, "exports") + workbook = build_traceability_workbook(batch, merged_fields, conflicts, specs, generation_results) + excel_path = target_dir / f"{batch.batch_no}-字段来源追溯清单.xlsx" + workbook.save(excel_path) + create_artifact_for_file( + batch, + path=excel_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY, + file_format=ApplicationFormFillArtifact.FileFormat.EXCEL, + name="字段来源追溯清单", + metadata={"conflict_count": len(conflicts)}, + created_by_node="trace_export", + ) + excel_export = ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="traceability", + export_type=ExportedSummaryFile.ExportType.EXCEL, + file_name=excel_path.name, + storage_path=str(excel_path), + ) + + json_path = target_dir / "merged_fields.json" + payload = { + "batch_no": batch.batch_no, + "merged_fields": {key: asdict(value) for key, value in merged_fields.items()}, + "conflicts": conflicts, + "generation_results": generation_results or [], + } + json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + create_artifact_for_file( + batch, + path=json_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.MERGED_FIELDS, + file_format=ApplicationFormFillArtifact.FileFormat.JSON, + name="merged_fields", + metadata={"conflict_count": len(conflicts)}, + created_by_node="trace_export", + ) + json_export = ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="traceability", + export_type=ExportedSummaryFile.ExportType.JSON, + file_name=json_path.name, + storage_path=str(json_path), + ) + return [excel_export, json_export] diff --git a/review_agent/application_form_fill/services/word_fill.py b/review_agent/application_form_fill/services/word_fill.py new file mode 100644 index 0000000..801f56a --- /dev/null +++ b/review_agent/application_form_fill/services/word_fill.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import re +from pathlib import Path + +from docx import Document +from docx.oxml import OxmlElement +from docx.oxml.ns import qn +from docx.shared import RGBColor + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile + + +def fill_template( + template_path: str | Path, + output_path: str | Path, + spec: TemplateSpec, + fields: dict[str, MergedField], + conflicts: list[dict] | None = None, +) -> Path: + document = Document(str(template_path)) + conflict_keys = {item.get("field_key") for item in conflicts or []} + for field_config in spec.fields: + target = field_config.get("target") or {} + if target.get("type") != "table_row": + continue + key = field_config.get("key") + field = fields.get(key) + if not field: + continue + fill_table_row( + document, + str(target.get("row_label") or field_config.get("label") or ""), + field.value, + conflict=key in conflict_keys or field.has_conflict, + ) + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + document.save(str(output)) + return output + + +def fill_table_row(document: Document, row_label: str, value: str, *, conflict: bool = False) -> bool: + normalized_label = _normalize_label(row_label) + for table in document.tables: + for row in table.rows: + if len(row.cells) < 2: + continue + if _normalize_label(row.cells[0].text) != normalized_label: + continue + target = row.cells[1] + target.text = "" + paragraph = target.paragraphs[0] + run = paragraph.add_run(value) + if conflict: + run.font.color.rgb = RGBColor(0xFF, 0x00, 0x00) + apply_cell_shading(target, "FFFF00") + return True + return False + + +def apply_cell_shading(cell, fill: str) -> None: + tc_pr = cell._tc.get_or_add_tcPr() + shading = tc_pr.find(qn("w:shd")) + if shading is None: + shading = OxmlElement("w:shd") + tc_pr.append(shading) + shading.set(qn("w:fill"), fill) + + +def create_word_export( + batch: ApplicationFormFillBatch, + spec: TemplateSpec, + template_path: str | Path, + fields: dict[str, MergedField], + conflicts: list[dict] | None = None, +) -> ExportedSummaryFile: + target_dir = ensure_batch_subdir(batch, "filled") + product_name = _safe_filename(batch.product_name or fields.get("product_name", MergedField("product_name", "产品名称", "", "", "", 0)).value or "未识别产品") + output_path = target_dir / f"{batch.batch_no}-{product_name}-{_safe_filename(spec.output_label)}.docx" + fill_template(template_path, output_path, spec, fields, conflicts) + create_artifact_for_file( + batch, + path=output_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE, + file_format=ApplicationFormFillArtifact.FileFormat.DOCX, + name=spec.output_label, + metadata={"template_code": spec.code, "conflict_count": len(conflicts or [])}, + created_by_node="word_fill", + ) + return ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name=output_path.name, + storage_path=str(output_path), + ) + + +def _normalize_label(value: str) -> str: + return re.sub(r"\s+", "", value or "").replace(":", "").replace(":", "") + + +def _safe_filename(value: str) -> str: + text = re.sub(r'[\\/:*?"<>|]+', "_", value or "") + return text.strip()[:80] or "output" diff --git a/tests/test_application_form_fill_traceability.py b/tests/test_application_form_fill_traceability.py new file mode 100644 index 0000000..cec08f8 --- /dev/null +++ b/tests/test_application_form_fill_traceability.py @@ -0,0 +1,85 @@ +import json + +import pytest +from openpyxl import load_workbook + +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.services.traceability_export import save_traceability_exports +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_traceability_exports_excel_json_and_records(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-TRACE") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-TRACE", + work_dir=str(tmp_path / "aff" / "AFF-TRACE"), + ) + spec = TemplateSpec( + code="registration_certificate", + name="注册证格式", + source_file="template.docx", + output_label="注册证格式", + applies_when={}, + file_format="docx", + fields=[{"key": "product_name", "label": "产品名称"}], + ) + merged_fields = { + "product_name": MergedField( + "product_name", + "产品名称", + "甲胎蛋白检测试剂盒", + "说明书.txt", + "产品名称:甲胎蛋白检测试剂盒", + 0.8, + ) + } + conflicts = [ + { + "field_key": "storage_condition", + "field_label": "储存条件", + "selected_value": "2-8℃", + "conflict_values": [{"value": "-20℃", "source_file": "产品技术要求.txt"}], + "handling": "说明书优先", + } + ] + + exports = save_traceability_exports( + batch, + merged_fields, + conflicts, + [spec], + [{"template_label": "注册证格式", "word_status": "success", "pdf_status": "待增强"}], + ) + + assert {export.export_type for export in exports} == { + ExportedSummaryFile.ExportType.EXCEL, + ExportedSummaryFile.ExportType.JSON, + } + excel_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.EXCEL) + workbook = load_workbook(excel_export.storage_path) + assert workbook.sheetnames == ["字段追溯", "冲突字段", "低置信度条目", "生成结果"] + assert workbook["字段追溯"]["B2"].value == "产品名称" + assert workbook["冲突字段"]["C2"].value == "-20℃" + + json_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.JSON) + payload = json.loads(open(json_export.storage_path, encoding="utf-8").read()) + assert payload["merged_fields"]["product_name"]["value"] == "甲胎蛋白检测试剂盒" + assert ApplicationFormFillArtifact.objects.filter( + batch=batch, + artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY, + ).exists() diff --git a/tests/test_application_form_fill_word_fill.py b/tests/test_application_form_fill_word_fill.py new file mode 100644 index 0000000..264b716 --- /dev/null +++ b/tests/test_application_form_fill_word_fill.py @@ -0,0 +1,121 @@ +import zipfile + +import pytest +from docx import Document + +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.services.word_fill import create_word_export, fill_template +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def _spec(): + return TemplateSpec( + code="registration_certificate", + name="注册证格式", + source_file="template.docx", + output_label="注册证格式", + applies_when={"registration_type": ["首次注册"]}, + file_format="docx", + fields=[ + {"key": "product_name", "label": "产品名称", "target": {"type": "table_row", "row_label": "产品名称"}}, + {"key": "intended_use", "label": "预期用途", "target": {"type": "table_row", "row_label": "预期用途"}}, + ], + ) + + +def _template(path): + document = Document() + table = document.add_table(rows=2, cols=2) + table.rows[0].cells[0].text = "产品名称" + table.rows[1].cells[0].text = "预期用途" + document.save(path) + + +def test_word_fill_writes_table_rows(tmp_path): + template_path = tmp_path / "template.docx" + output_path = tmp_path / "filled.docx" + _template(template_path) + + fill_template( + template_path, + output_path, + _spec(), + { + "product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8), + "intended_use": MergedField("intended_use", "预期用途", "用于体外检测", "说明书.txt", "证据", 0.8), + }, + ) + + document = Document(output_path) + assert document.tables[0].rows[0].cells[1].text == "甲胎蛋白检测试剂盒" + assert document.tables[0].rows[1].cells[1].text == "用于体外检测" + + +def test_word_fill_highlights_conflict_in_docx_xml(tmp_path): + template_path = tmp_path / "template.docx" + output_path = tmp_path / "filled.docx" + _template(template_path) + + fill_template( + template_path, + output_path, + _spec(), + { + "product_name": MergedField( + "product_name", + "产品名称", + "甲胎蛋白检测试剂盒", + "说明书.txt", + "证据", + 0.8, + has_conflict=True, + ) + }, + conflicts=[{"field_key": "product_name"}], + ) + + with zipfile.ZipFile(output_path) as package: + document_xml = package.read("word/document.xml").decode("utf-8") + assert 'w:fill="FFFF00"' in document_xml + assert 'w:color w:val="FF0000"' in document_xml + + +def test_create_word_export_records_artifact_and_export(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-WORD") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-WORD", + product_name="甲胎蛋白检测试剂盒", + work_dir=str(tmp_path / "aff" / "AFF-WORD"), + ) + template_path = tmp_path / "template.docx" + _template(template_path) + + exported = create_word_export( + batch, + _spec(), + template_path, + {"product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8)}, + ) + + assert exported.export_type == ExportedSummaryFile.ExportType.WORD + assert exported.workflow_type == "application_form_fill" + assert exported.workflow_batch_id == batch.pk + assert ApplicationFormFillArtifact.objects.filter( + batch=batch, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE, + ).exists()