diff --git a/review_agent/regulatory_info_package/services/docx_document.py b/review_agent/regulatory_info_package/services/docx_document.py index eebdc0d..e42d49e 100644 --- a/review_agent/regulatory_info_package/services/docx_document.py +++ b/review_agent/regulatory_info_package/services/docx_document.py @@ -1,18 +1,25 @@ from __future__ import annotations +import re from pathlib import Path from docx import Document from docx.enum.text import WD_COLOR_INDEX from docx.shared import RGBColor +from django.utils import timezone from review_agent.regulatory_info_package.schemas import MergedField +PLACEHOLDER_RE = re.compile(r"\{\{([a-zA-Z0-9_]+)\}\}") + + def write_docx_from_template( source_path: str | Path, output_path: str | Path, merged_fields: dict[str, MergedField], + *, + template_code: str = "", ) -> tuple[int, int, int]: source = Path(source_path) output = Path(output_path) @@ -25,16 +32,14 @@ def write_docx_from_template( highlight_count = 0 missing_count = 0 llm_only_count = 0 - for paragraph in document.paragraphs: - for placeholder, field in replacements.items(): - if placeholder in paragraph.text: - _replace_paragraph_text(paragraph, paragraph.text.replace(placeholder, field.value), field) - if field.highlight_reason != "none": - highlight_count += 1 - if field.highlight_reason == "missing": - missing_count += 1 - if field.highlight_reason == "llm_only": - llm_only_count += 1 + highlight_count, missing_count, llm_only_count = _insert_prefill_block(document, merged_fields) + highlight_count += _apply_known_template_replacements(document, merged_fields) + if template_code == "ch1_5_product_list": + _rebuild_product_list_table(document, merged_fields) + paragraph_counts = _replace_placeholders(document, replacements, merged_fields) + highlight_count += paragraph_counts[0] + missing_count += paragraph_counts[1] + llm_only_count += paragraph_counts[2] document.add_page_break() heading = document.add_paragraph() heading_run = heading.add_run("预生成字段") @@ -60,6 +65,28 @@ def write_docx_from_template( return highlight_count, missing_count, llm_only_count +def _insert_prefill_block(document, merged_fields: dict[str, MergedField]) -> tuple[int, int, int]: + first = document.paragraphs[0] if document.paragraphs else document.add_paragraph() + marker = first.insert_paragraph_before("【预生成版】以下字段由系统根据说明书预填,黄色或红色标记项请人工复核。") + marker.runs[0].bold = True + highlight_count = 0 + missing_count = 0 + llm_only_count = 0 + for field in merged_fields.values(): + paragraph = marker.insert_paragraph_before("") + run = paragraph.add_run(f"{field.label}:{field.value}") + if field.highlight_reason != "none": + run.font.highlight_color = WD_COLOR_INDEX.YELLOW + highlight_count += 1 + if field.highlight_reason == "conflict": + run.font.color.rgb = RGBColor(255, 0, 0) + if field.highlight_reason == "missing": + missing_count += 1 + if field.highlight_reason == "llm_only": + llm_only_count += 1 + return highlight_count, missing_count, llm_only_count + + def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None: for run in paragraph.runs: run.text = "" @@ -68,3 +95,155 @@ def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None: run.font.highlight_color = WD_COLOR_INDEX.YELLOW if field.highlight_reason == "conflict": run.font.color.rgb = RGBColor(255, 0, 0) + + +def _replace_placeholders( + document, + replacements: dict[str, MergedField], + merged_fields: dict[str, MergedField], +) -> tuple[int, int, int]: + highlight_count = 0 + missing_count = 0 + llm_only_count = 0 + for paragraph in _iter_paragraphs(document): + text = paragraph.text + if "{{" not in text or "}}" not in text: + continue + used_fields: list[MergedField] = [] + + def replace(match: re.Match[str]) -> str: + key = match.group(1) + placeholder = match.group(0) + field = replacements.get(placeholder) or _default_placeholder_field(key, merged_fields) + used_fields.append(field) + return field.value + + new_text = PLACEHOLDER_RE.sub(replace, text) + if new_text == text: + continue + field_for_style = next((field for field in used_fields if field.highlight_reason != "none"), None) or used_fields[0] + _replace_paragraph_text(paragraph, new_text, field_for_style) + for field in used_fields: + if field.highlight_reason != "none": + highlight_count += 1 + if field.highlight_reason == "missing": + missing_count += 1 + if field.highlight_reason == "llm_only": + llm_only_count += 1 + return highlight_count, missing_count, llm_only_count + + +def _iter_paragraphs(document): + yield from document.paragraphs + for table in document.tables: + for row in table.rows: + for cell in row.cells: + yield from cell.paragraphs + + +def _apply_known_template_replacements(document, merged_fields: dict[str, MergedField]) -> int: + product = _field_value(merged_fields, "product_name") + applicant = _field_value(merged_fields, "applicant_name") + today = timezone.localdate().strftime("%Y年%m月%d日") + replacements = { + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)": product, + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒": product, + "呼吸道合胞病毒 、肺炎支产品名称: 原体核酸检测试剂盒(荧": f"产品名称:{product}", + "光PCR法)": "", + "卡尤迪生物科技宜兴有限公司": applicant, + "2023年09月20日": today, + "2023 年 10 月": today[:8], + } + changed = 0 + for paragraph in document.paragraphs: + changed += _replace_text_in_paragraph(paragraph, replacements, merged_fields) + for table in document.tables: + for row in table.rows: + for cell in row.cells: + for paragraph in cell.paragraphs: + changed += _replace_text_in_paragraph(paragraph, replacements, merged_fields) + return changed + + +def _default_placeholder_field(key: str, merged_fields: dict[str, MergedField]) -> MergedField: + if key == "declaration_date": + return _plain_field(key, "日期", timezone.localdate().strftime("%Y年%m月%d日")) + label = key + for field in merged_fields.values(): + if field.key == key: + label = field.label + break + return MergedField( + key=key, + label=label, + value="/", + source="missing", + evidence="模板字段未从说明书中抽取到", + confidence=0.0, + highlight_reason="missing", + needs_review=True, + ) + + +def _replace_text_in_paragraph(paragraph, replacements: dict[str, str], merged_fields: dict[str, MergedField]) -> int: + text = paragraph.text + new_text = text + for old, new in replacements.items(): + if old in new_text: + new_text = new_text.replace(old, new) + if new_text == text: + return 0 + field = merged_fields.get("product_name") or MergedField( + key="product_name", + label="产品名称", + value=new_text, + source="rule", + evidence="", + confidence=0.0, + ) + _replace_paragraph_text(paragraph, new_text, field) + return 1 + + +def _rebuild_product_list_table(document, merged_fields: dict[str, MergedField]) -> None: + product = _field_value(merged_fields, "product_name") + package_specification = _field_value(merged_fields, "package_specification") + for paragraph in document.paragraphs: + if "的包装规格、货号、组分及主要组成成分见下表" in paragraph.text: + _replace_paragraph_text( + paragraph, + f"{product}的包装规格、货号、组分及主要组成成分见下表:", + merged_fields.get("product_name") or _plain_field("product_name", "产品名称", product), + ) + target = None + for table in document.tables: + header = [cell.text.strip() for cell in table.rows[0].cells] if table.rows else [] + if header[:6] == ["包装规格", "货号", "组成", "组分", "主要组成成分", "规格/数量"]: + target = table + break + if target is None: + return + while len(target.rows) > 1: + target._tbl.remove(target.rows[-1]._tr) + specs = [item.strip() for item in package_specification.replace(";", ";").split(";") if item.strip()] + if not specs: + specs = ["/"] + for spec in specs[:8]: + cells = target.add_row().cells + cells[0].text = spec + cells[1].text = "/" + cells[2].text = _field_value(merged_fields, "composition") + cells[3].text = _field_value(merged_fields, "component_name") + cells[4].text = _field_value(merged_fields, "main_component") + cells[5].text = _field_value(merged_fields, "quantity") + + +def _field_value(merged_fields: dict[str, MergedField], key: str) -> str: + field = merged_fields.get(key) + if not field or not field.value: + return "/" + return field.value + + +def _plain_field(key: str, label: str, value: str) -> MergedField: + return MergedField(key=key, label=label, value=value, source="rule", evidence="", confidence=0.0) diff --git a/review_agent/regulatory_info_package/services/legacy_doc_document.py b/review_agent/regulatory_info_package/services/legacy_doc_document.py index 596480b..f95d25c 100644 --- a/review_agent/regulatory_info_package/services/legacy_doc_document.py +++ b/review_agent/regulatory_info_package/services/legacy_doc_document.py @@ -4,6 +4,7 @@ import shutil from dataclasses import dataclass from pathlib import Path +from django.conf import settings from docx import Document from review_agent.regulatory_info_package.schemas import MergedField @@ -38,15 +39,43 @@ def write_legacy_doc_or_fallback( output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) capability = detect_legacy_doc_capability() - if capability.status == "available" and source.exists(): + native_enabled = bool(getattr(settings, "REGULATORY_INFO_PACKAGE_ENABLE_WORD_COM_NATIVE", False)) + if native_enabled and capability.status == "available" and source.exists(): shutil.copy2(source, output) - return output, "success", {"doc": capability.__dict__, "fallback_used": False} + try: + _append_doc_summary_with_word_com(output, merged_fields) + return output, "success", {"doc": capability.__dict__, "fallback_used": False, "native_write": True} + except Exception as exc: + capability = LegacyDocCapability( + status="unavailable", + adapter="UnavailableLegacyDocAdapter", + message=f"Word COM 写入失败:{exc}", + ) fallback = output.with_suffix(".docx") document = Document() - document.add_heading(output.stem, level=1) - document.add_paragraph("当前环境未检测到可用的 .doc 原生写入能力,已生成 docx 兜底文件。") + heading = document.add_paragraph() + heading.add_run(output.stem).bold = True + document.add_paragraph("【预生成版】当前未启用 .doc 原生写入,已生成 docx 兜底文件。") for field in merged_fields.values(): document.add_paragraph(f"{field.label}:{field.value}") document.save(fallback) - return fallback, "fallback_success", {"doc": capability.__dict__, "fallback_used": True} + return fallback, "fallback_success", {"doc": capability.__dict__, "fallback_used": True, "native_enabled": native_enabled} + +def _append_doc_summary_with_word_com(path: Path, merged_fields: dict[str, MergedField]) -> None: + import win32com.client + + word = win32com.client.Dispatch("Word.Application") + word.Visible = False + document = None + try: + document = word.Documents.Open(str(path.resolve())) + end_range = document.Range(document.Content.End - 1, document.Content.End - 1) + lines = ["", "【预生成版】以下字段由系统根据说明书预填,请人工复核。"] + lines.extend(f"{field.label}:{field.value}" for field in merged_fields.values()) + end_range.InsertAfter("\r".join(lines)) + document.Save() + finally: + if document is not None: + document.Close(False) + word.Quit() diff --git a/review_agent/regulatory_info_package/services/package_generate.py b/review_agent/regulatory_info_package/services/package_generate.py index b3efadb..5fa0030 100644 --- a/review_agent/regulatory_info_package/services/package_generate.py +++ b/review_agent/regulatory_info_package/services/package_generate.py @@ -39,7 +39,12 @@ def _generate_one( actual_format = actual_path.suffix.lower().lstrip(".") highlight_count = missing_count = llm_only_count = 0 else: - highlight_count, missing_count, llm_only_count = write_docx_from_template(template_path, output_path, merged_fields) + highlight_count, missing_count, llm_only_count = write_docx_from_template( + template_path, + output_path, + merged_fields, + template_code=spec.code, + ) actual_path = output_path actual_format = "docx" status = "success" diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx new file mode 100644 index 0000000..c92ea89 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx new file mode 100644 index 0000000..332f518 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx new file mode 100644 index 0000000..59d05cd Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx b/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx new file mode 100644 index 0000000..62e4e8f Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx b/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx new file mode 100644 index 0000000..42e962e Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx b/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx new file mode 100644 index 0000000..8f59550 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.9 产品申报前沟通的说明.docx b/review_agent/regulatory_info_package/templates/clean/CH1.9 产品申报前沟通的说明.docx new file mode 100644 index 0000000..112ee12 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.9 产品申报前沟通的说明.docx differ diff --git a/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml index 33ac071..c8790de 100644 --- a/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml +++ b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml @@ -1,5 +1,5 @@ version: regulatory_info_package_templates_v1 -source_dir: docs/0.原始材料/第1章 监管信息 +source_dir: review_agent/regulatory_info_package/templates/clean zip_name: 第1章 监管信息(预生成版).zip templates: - code: ch1_2_directory @@ -33,13 +33,11 @@ templates: label: 包装规格 placeholder: "{{package_specification}}" - code: ch1_9_pre_submission - source_file: CH1.9 产品申报前沟通的说明.doc - output_name: CH1.9 产品申报前沟通的说明.doc - file_format: doc + source_file: CH1.9 产品申报前沟通的说明.docx + output_name: CH1.9 产品申报前沟通的说明.docx + file_format: docx strategy: pre_submission include_in_zip: true - prefer_legacy_doc_native: true - allow_docx_fallback: true fields: - key: product_name label: 产品名称 diff --git a/tests/test_regulatory_info_package_package_generate.py b/tests/test_regulatory_info_package_package_generate.py index fb8badc..6c47560 100644 --- a/tests/test_regulatory_info_package_package_generate.py +++ b/tests/test_regulatory_info_package_package_generate.py @@ -1,7 +1,8 @@ -import zipfile - import pytest +from docx import Document +from pathlib import Path +from django.conf import settings from review_agent.models import Conversation, RegulatoryInfoPackageBatch from review_agent.regulatory_info_package.services.field_merge import merge_fields from review_agent.regulatory_info_package.services.package_generate import generate_package_documents @@ -11,6 +12,36 @@ from review_agent.regulatory_info_package.services.template_config import load_t pytestmark = pytest.mark.django_db +def test_template_config_uses_clean_internal_templates(): + config = load_template_config() + source_dir = Path(config["source_dir"]) + + assert source_dir == settings.BASE_DIR / "review_agent" / "regulatory_info_package" / "templates" / "clean" + assert source_dir.exists() + assert len(config["templates"]) == 7 + assert all((source_dir / item["source_file"]).exists() for item in config["templates"]) + + +def test_clean_templates_expose_stable_fill_placeholders(): + config = load_template_config() + source_dir = Path(config["source_dir"]) + expected_by_code = { + "ch1_2_directory": {"{{product_name}}", "{{applicant_name}}"}, + "ch1_4_application_form": {"{{product_name}}", "{{applicant_name}}"}, + "ch1_5_product_list": {"{{product_name}}", "{{package_specification}}"}, + "ch1_9_pre_submission": {"{{product_name}}", "{{applicant_name}}"}, + "ch1_11_1_standards": {"{{standard_no}}", "{{product_name}}"}, + "ch1_11_5_authenticity": {"{{product_name}}", "{{applicant_name}}"}, + "ch1_11_6_conformity": {"{{product_name}}", "{{applicant_name}}"}, + } + + for item in config["templates"]: + document = Document(source_dir / item["source_file"]) + text = _document_text(document) + for placeholder in expected_by_code[item["code"]]: + assert placeholder in text + + def test_generate_package_documents_creates_seven_results(django_user_model, tmp_path): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") @@ -29,3 +60,104 @@ def test_generate_package_documents_creates_seven_results(django_user_model, tmp (result.template_code, result.status, result.error_message) for result in results ] assert all(result.path for result in results) + + +def test_generated_docx_has_visible_prefill_block_near_top(django_user_model, tmp_path): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + batch = RegulatoryInfoPackageBatch.objects.create( + conversation=conversation, + user=user, + batch_no="RIP-20260610154100-abcdef", + work_dir=str(tmp_path), + ) + merged, _summary = merge_fields({"product_name": {"value": "测试产品", "label": "产品名称"}}, {}) + + results = generate_package_documents(batch, load_template_config(), merged) + docx_result = next(result for result in results if result.template_code == "ch1_2_directory") + document = Document(docx_result.path) + first_text = "\n".join(paragraph.text for paragraph in document.paragraphs[:8]) + + assert "预生成版" in first_text + assert "测试产品" in first_text + + +def test_generated_docx_replaces_sample_case_content(django_user_model, tmp_path): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + batch = RegulatoryInfoPackageBatch.objects.create( + conversation=conversation, + user=user, + batch_no="RIP-20260610154200-abcdef", + work_dir=str(tmp_path), + ) + merged, _summary = merge_fields( + { + "product_name": {"value": "测试产品", "label": "产品名称"}, + "package_specification": {"value": "24人份/盒;48人份/盒", "label": "包装规格"}, + }, + {}, + ) + + results = generate_package_documents(batch, load_template_config(), merged) + docx_results = [result for result in results if result.actual_format == "docx"] + for result in docx_results: + document = Document(result.path) + text = "\n".join(paragraph.text for paragraph in document.paragraphs) + for table in document.tables: + for row in table.rows: + text += "\n" + "\t".join(cell.text for cell in row.cells) + assert "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒" not in text + product_list = next(result for result in results if result.template_code == "ch1_5_product_list") + product_doc = Document(product_list.path) + table = product_doc.tables[0] + assert table.rows[1].cells[0].text == "24人份/盒" + assert table.rows[1].cells[1].text == "/" + assert "6018003102" not in "\n".join(cell.text for row in table.rows for cell in row.cells) + + +def test_generated_docs_fill_clean_template_body(django_user_model, tmp_path): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + batch = RegulatoryInfoPackageBatch.objects.create( + conversation=conversation, + user=user, + batch_no="RIP-20260610154300-abcdef", + work_dir=str(tmp_path), + ) + merged, _summary = merge_fields( + { + "product_name": {"value": "甲型流感病毒核酸检测试剂盒", "label": "产品名称"}, + "applicant_name": {"value": "星河医疗科技有限公司", "label": "申请人名称"}, + "package_specification": {"value": "24人份/盒;48人份/盒", "label": "包装规格"}, + "standard_no": {"value": "GB/T 29791.1-2013", "label": "标准号"}, + }, + {}, + ) + + results = generate_package_documents(batch, load_template_config(), merged) + + for code in ["ch1_2_directory", "ch1_4_application_form", "ch1_11_5_authenticity", "ch1_11_6_conformity"]: + result = next(item for item in results if item.template_code == code) + text = _document_text(Document(result.path)) + assert "甲型流感病毒核酸检测试剂盒" in text + assert "星河医疗科技有限公司" in text + assert "{{" not in text + assert "}}" not in text + + standards = next(item for item in results if item.template_code == "ch1_11_1_standards") + standards_text = _document_text(Document(standards.path)) + assert "GB/T 29791.1-2013" in standards_text + + product_list = next(item for item in results if item.template_code == "ch1_5_product_list") + product_text = _document_text(Document(product_list.path)) + assert "24人份/盒" in product_text + assert "48人份/盒" in product_text + + +def _document_text(document: Document) -> str: + text = "\n".join(paragraph.text for paragraph in document.paragraphs) + for table in document.tables: + for row in table.rows: + text += "\n" + "\t".join(cell.text for cell in row.cells) + return text