diff --git a/review_agent/regulatory_info_package/services/docx_document.py b/review_agent/regulatory_info_package/services/docx_document.py index e42d49e..c7b5629 100644 --- a/review_agent/regulatory_info_package/services/docx_document.py +++ b/review_agent/regulatory_info_package/services/docx_document.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import re from pathlib import Path @@ -20,6 +21,7 @@ def write_docx_from_template( merged_fields: dict[str, MergedField], *, template_code: str = "", + directory_page_numbers: dict[str, str] | None = None, ) -> tuple[int, int, int]: source = Path(source_path) output = Path(output_path) @@ -32,61 +34,19 @@ def write_docx_from_template( highlight_count = 0 missing_count = 0 llm_only_count = 0 - highlight_count, missing_count, llm_only_count = _insert_prefill_block(document, merged_fields) - highlight_count += _apply_known_template_replacements(document, merged_fields) + highlight_count += _apply_known_template_replacements(document, merged_fields, template_code=template_code) if template_code == "ch1_5_product_list": _rebuild_product_list_table(document, merged_fields) + if template_code == "ch1_2_directory": + _apply_directory_page_numbers(document, directory_page_numbers or {}) paragraph_counts = _replace_placeholders(document, replacements, merged_fields) highlight_count += paragraph_counts[0] missing_count += paragraph_counts[1] llm_only_count += paragraph_counts[2] - document.add_page_break() - heading = document.add_paragraph() - heading_run = heading.add_run("预生成字段") - heading_run.bold = True - table = document.add_table(rows=1, cols=4) - table.rows[0].cells[0].text = "字段" - table.rows[0].cells[1].text = "值" - table.rows[0].cells[2].text = "来源" - table.rows[0].cells[3].text = "待确认" - for field in merged_fields.values(): - cells = table.add_row().cells - cells[0].text = field.label - cells[1].text = field.value - cells[2].text = field.source - cells[3].text = "是" if field.needs_review else "否" - if field.highlight_reason != "none": - highlight_count += 1 - if field.highlight_reason == "missing": - missing_count += 1 - if field.highlight_reason == "llm_only": - llm_only_count += 1 document.save(output) return highlight_count, missing_count, llm_only_count -def _insert_prefill_block(document, merged_fields: dict[str, MergedField]) -> tuple[int, int, int]: - first = document.paragraphs[0] if document.paragraphs else document.add_paragraph() - marker = first.insert_paragraph_before("【预生成版】以下字段由系统根据说明书预填,黄色或红色标记项请人工复核。") - marker.runs[0].bold = True - highlight_count = 0 - missing_count = 0 - llm_only_count = 0 - for field in merged_fields.values(): - paragraph = marker.insert_paragraph_before("") - run = paragraph.add_run(f"{field.label}:{field.value}") - if field.highlight_reason != "none": - run.font.highlight_color = WD_COLOR_INDEX.YELLOW - highlight_count += 1 - if field.highlight_reason == "conflict": - run.font.color.rgb = RGBColor(255, 0, 0) - if field.highlight_reason == "missing": - missing_count += 1 - if field.highlight_reason == "llm_only": - llm_only_count += 1 - return highlight_count, missing_count, llm_only_count - - def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None: for run in paragraph.runs: run.text = "" @@ -97,6 +57,20 @@ def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None: run.font.color.rgb = RGBColor(255, 0, 0) +def _apply_directory_page_numbers(document, page_numbers: dict[str, str]) -> None: + for table in document.tables: + if not table.rows: + continue + header = [cell.text.strip() for cell in table.rows[0].cells] + if len(header) < 5 or header[0] != "RPS目录" or header[4] != "页码": + continue + for row in table.rows[1:]: + code = row.cells[0].text.strip() + if code in page_numbers: + row.cells[4].text = page_numbers[code] + return + + def _replace_placeholders( document, replacements: dict[str, MergedField], @@ -141,19 +115,26 @@ def _iter_paragraphs(document): yield from cell.paragraphs -def _apply_known_template_replacements(document, merged_fields: dict[str, MergedField]) -> int: +def _apply_known_template_replacements(document, merged_fields: dict[str, MergedField], *, template_code: str = "") -> int: product = _field_value(merged_fields, "product_name") applicant = _field_value(merged_fields, "applicant_name") today = timezone.localdate().strftime("%Y年%m月%d日") replacements = { + "xxxx年xx月xx日": today, + "XXXX年XX月XX日": today, + "xxxx 年 xx 月 xx 日": today, + "XXXX 年 XX 月 XX 日": today, + "2023年09月20日": today, + "2023 年 10 月": today[:8], + } + if not template_code.startswith("ch1_11"): + replacements.update({ "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)": product, "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒": product, "呼吸道合胞病毒 、肺炎支产品名称: 原体核酸检测试剂盒(荧": f"产品名称:{product}", "光PCR法)": "", "卡尤迪生物科技宜兴有限公司": applicant, - "2023年09月20日": today, - "2023 年 10 月": today[:8], - } + }) changed = 0 for paragraph in document.paragraphs: changed += _replace_text_in_paragraph(paragraph, replacements, merged_fields) @@ -208,6 +189,8 @@ def _replace_text_in_paragraph(paragraph, replacements: dict[str, str], merged_f def _rebuild_product_list_table(document, merged_fields: dict[str, MergedField]) -> None: product = _field_value(merged_fields, "product_name") package_specification = _field_value(merged_fields, "package_specification") + component_table = _component_table_payload(merged_fields) + component_notes = _field_value(merged_fields, "component_notes") for paragraph in document.paragraphs: if "的包装规格、货号、组分及主要组成成分见下表" in paragraph.text: _replace_paragraph_text( @@ -215,27 +198,38 @@ def _rebuild_product_list_table(document, merged_fields: dict[str, MergedField]) f"{product}的包装规格、货号、组分及主要组成成分见下表:", merged_fields.get("product_name") or _plain_field("product_name", "产品名称", product), ) + if "规格A和规格B的区别" in paragraph.text and component_notes != "/": + _replace_paragraph_text( + paragraph, + component_notes, + merged_fields.get("component_notes") or _plain_field("component_notes", "主要组成成分备注", component_notes), + ) target = None for table in document.tables: header = [cell.text.strip() for cell in table.rows[0].cells] if table.rows else [] if header[:6] == ["包装规格", "货号", "组成", "组分", "主要组成成分", "规格/数量"]: target = table break - if target is None: - return - while len(target.rows) > 1: - target._tbl.remove(target.rows[-1]._tr) - specs = [item.strip() for item in package_specification.replace(";", ";").split(";") if item.strip()] - if not specs: - specs = ["/"] - for spec in specs[:8]: - cells = target.add_row().cells - cells[0].text = spec - cells[1].text = "/" - cells[2].text = _field_value(merged_fields, "composition") - cells[3].text = _field_value(merged_fields, "component_name") - cells[4].text = _field_value(merged_fields, "main_component") - cells[5].text = _field_value(merged_fields, "quantity") + specs = _component_specs(component_table) or [ + (spec, None) for spec in [item.strip() for item in package_specification.replace(";", ";").split(";") if item.strip()] + ] + if target is not None: + _clear_table_body(target) + if component_table: + _fill_product_component_table(target, component_table, specs) + else: + if not specs: + specs = [("/", None)] + for spec, _index in specs[:8]: + cells = target.add_row().cells + cells[0].text = spec + cells[1].text = "/" + cells[2].text = _field_value(merged_fields, "composition") + cells[3].text = _field_value(merged_fields, "component_name") + cells[4].text = _field_value(merged_fields, "main_component") + cells[5].text = _field_value(merged_fields, "quantity") + if component_table: + _rebuild_component_comparison_table(document, component_table, specs) def _field_value(merged_fields: dict[str, MergedField], key: str) -> str: @@ -247,3 +241,82 @@ def _field_value(merged_fields: dict[str, MergedField], key: str) -> str: def _plain_field(key: str, label: str, value: str) -> MergedField: return MergedField(key=key, label=label, value=value, source="rule", evidence="", confidence=0.0) + + +def _component_table_payload(merged_fields: dict[str, MergedField]) -> dict: + field = merged_fields.get("component_table") + if not field or not field.value or field.value == "/": + return {} + try: + payload = json.loads(field.value) + except json.JSONDecodeError: + return {} + if not isinstance(payload, dict): + return {} + rows = payload.get("rows") or [] + header = payload.get("header") or [] + if not isinstance(header, list) or not isinstance(rows, list): + return {} + return {"header": header, "rows": rows} + + +def _component_specs(component_table: dict) -> list[tuple[str, int]]: + header = component_table.get("header") or [] + specs: list[tuple[str, int]] = [] + for index, value in enumerate(header[2:], start=2): + label = str(value or "").strip() + if not label: + continue + label = label.replace("规格(", "").replace("规格(", "").rstrip("))") + specs.append((label, index)) + return specs + + +def _clear_table_body(table) -> None: + while len(table.rows) > 1: + table._tbl.remove(table.rows[-1]._tr) + + +def _fill_product_component_table(table, component_table: dict, specs: list[tuple[str, int]]) -> None: + rows = component_table.get("rows") or [] + for spec_label, spec_index in specs: + for row in rows: + cells = table.add_row().cells + cells[0].text = spec_label + cells[1].text = "/" + cells[2].text = "/" + cells[3].text = _row_value(row, 0) + cells[4].text = _row_value(row, 1) + cells[5].text = _row_value(row, spec_index or 0) + + +def _rebuild_component_comparison_table(document, component_table: dict, specs: list[tuple[str, int]]) -> None: + target = None + for table in document.tables: + header = [cell.text.strip() for cell in table.rows[0].cells] if table.rows else [] + if header and header[0] == "组分名称": + target = table + break + if target is None: + return + _clear_table_body(target) + header_cells = target.rows[0].cells + labels = ["组分名称", *[spec for spec, _index in specs[: len(header_cells) - 1]]] + while len(labels) < len(header_cells): + labels.append("备注") + for index, label in enumerate(labels[: len(header_cells)]): + header_cells[index].text = label + for row in component_table.get("rows") or []: + cells = target.add_row().cells + cells[0].text = _row_value(row, 0) + for cell_index, (_spec_label, spec_index) in enumerate(specs[: len(cells) - 1], start=1): + cells[cell_index].text = _row_value(row, spec_index) + for cell_index in range(len(specs[: len(cells) - 1]) + 1, len(cells)): + cells[cell_index].text = "/" + + +def _row_value(row, index: int) -> str: + if not isinstance(row, list) or index >= len(row): + return "/" + value = str(row[index] or "").strip() + return value or "/" diff --git a/review_agent/regulatory_info_package/services/field_extract.py b/review_agent/regulatory_info_package/services/field_extract.py index 4f0eb65..d2342d3 100644 --- a/review_agent/regulatory_info_package/services/field_extract.py +++ b/review_agent/regulatory_info_package/services/field_extract.py @@ -13,6 +13,11 @@ from review_agent.regulatory_info_package.schemas import InstructionExtractResul FIELD_PATTERNS = { "product_name": ("产品名称", r"产品名称[::\s]*([^\n\r]+)"), + "applicant_name": ("申请人名称", r"(?:申请人名称|注册人/售后服务单位名称|注册人名称|售后服务单位名称|生产企业名称)[::\s]*([^\n\r]+)"), + "manufacturer_name": ("生产企业名称", r"生产企业名称[::\s]*([^\n\r]+)"), + "applicant_address": ("申请人住所", r"(?:申请人住所|注册人住所|生产企业住所)[::\s]*([^\n\r]+)"), + "applicant_contact": ("申请人联系方式", r"(?:联系方式|联系电话|电话)[::\s]*([^\n\r]+)"), + "production_address": ("生产地址", r"生产地址[::\s]*([^\n\r]+)"), "storage_condition": ("储存条件", r"(?:储存条件|贮存条件|保存条件)[::\s]*([^\n\r]+)"), "intended_use": ("预期用途", r"预期用途[::\s]*([^\n\r]+)"), "package_specification": ("包装规格", r"(?:包装规格|规格)[::\s]*([^\n\r]+)"), @@ -47,6 +52,24 @@ def extract_fields_by_rules(instruction: InstructionExtractResult) -> dict[str, "confidence": 0.75, "source": "rule", } + component_table = _best_component_table(instruction.component_tables) + if component_table: + results["component_table"] = { + "label": "主要组成成分", + "value": json.dumps(component_table, ensure_ascii=False), + "evidence": "说明书【主要组成成分】表格", + "confidence": 0.86, + "source": "rule", + } + component_notes = _component_notes(instruction.sections) + if component_notes: + results["component_notes"] = { + "label": "主要组成成分备注", + "value": component_notes, + "evidence": "说明书【主要组成成分】段落", + "confidence": 0.8, + "source": "rule", + } return results @@ -133,3 +156,16 @@ def _parse_json_object(raw: str) -> dict: if start == -1 or end == -1: return {} return json.loads(text[start : end + 1]) + + +def _best_component_table(component_tables: list[dict]) -> dict: + if not component_tables: + return {} + return max(component_tables, key=lambda table: len(table.get("rows") or [])) + + +def _component_notes(sections: dict[str, str]) -> str: + for key, value in sections.items(): + if "主要组成" in key: + return value.strip() + return "" diff --git a/review_agent/regulatory_info_package/services/package_generate.py b/review_agent/regulatory_info_package/services/package_generate.py index 5fa0030..6b11ccc 100644 --- a/review_agent/regulatory_info_package/services/package_generate.py +++ b/review_agent/regulatory_info_package/services/package_generate.py @@ -1,7 +1,10 @@ from __future__ import annotations +import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +from zipfile import ZipFile +from xml.etree import ElementTree from review_agent.models import RegulatoryInfoPackageBatch from review_agent.regulatory_info_package.constants import GENERATED_FILE_FAILED @@ -18,9 +21,16 @@ def generate_package_documents( merged_fields: dict[str, MergedField], ) -> list[GeneratedFileResult]: specs = template_specs(config) - with ThreadPoolExecutor(max_workers=min(4, len(specs) or 1)) as executor: - futures = [executor.submit(_generate_one, batch, config, spec, merged_fields) for spec in specs] - return [future.result() for future in as_completed(futures)] + directory_specs = [spec for spec in specs if spec.code == "ch1_2_directory"] + content_specs = [spec for spec in specs if spec.code != "ch1_2_directory"] + results: list[GeneratedFileResult] = [] + with ThreadPoolExecutor(max_workers=min(4, len(content_specs) or 1)) as executor: + futures = [executor.submit(_generate_one, batch, config, spec, merged_fields) for spec in content_specs] + results.extend(future.result() for future in as_completed(futures)) + page_numbers = _directory_page_numbers(results) + for spec in directory_specs: + results.append(_generate_one(batch, config, spec, merged_fields, directory_page_numbers=page_numbers)) + return results def _generate_one( @@ -28,6 +38,8 @@ def _generate_one( config: dict, spec: TemplateSpec, merged_fields: dict[str, MergedField], + *, + directory_page_numbers: dict[str, str] | None = None, ) -> GeneratedFileResult: try: template_path = copy_template_to_batch(batch, config, spec) @@ -44,6 +56,7 @@ def _generate_one( output_path, merged_fields, template_code=spec.code, + directory_page_numbers=directory_page_numbers, ) actual_path = output_path actual_format = "docx" @@ -68,3 +81,106 @@ def _generate_one( status=GENERATED_FILE_FAILED, error_message=str(exc), ) + + +def _directory_page_numbers(results: list[GeneratedFileResult]) -> dict[str, str]: + page_numbers = {"CH1.2": "1"} + for result in results: + if result.status not in {"success", "fallback_success"} or not result.path: + continue + code = _directory_code_from_file_name(result.file_name) + if not code: + continue + page_numbers[code] = str(count_document_pages(result.path)) + return page_numbers + + +def _directory_code_from_file_name(file_name: str) -> str: + stem = Path(file_name).stem.strip() + return stem.split()[0] if stem.startswith("CH") else "" + + +def count_document_pages(path: str | Path) -> int: + file_path = Path(path) + if not file_path.exists(): + return 1 + pages = _count_pages_from_docx_properties(file_path) + if pages: + return pages + pages = _count_pages_with_pywin32(file_path) + if pages: + return pages + pages = _count_pages_with_powershell_word(file_path) + if pages: + return pages + return 1 + + +def _count_pages_from_docx_properties(file_path: Path) -> int: + if file_path.suffix.lower() != ".docx": + return 0 + try: + with ZipFile(file_path) as archive: + root = ElementTree.fromstring(archive.read("docProps/app.xml")) + namespace = {"ep": "http://schemas.openxmlformats.org/officeDocument/2006/extended-properties"} + pages = root.find("ep:Pages", namespace) + return max(int((pages.text or "").strip()), 1) if pages is not None else 0 + except Exception: + return 0 + + +def _count_pages_with_pywin32(file_path: Path) -> int: + try: + import win32com.client + + word = win32com.client.DispatchEx("Word.Application") + word.Visible = False + document = None + try: + document = word.Documents.Open(str(file_path.resolve()), ReadOnly=True) + document.Repaginate() + return max(int(document.ComputeStatistics(2)), 1) + finally: + if document is not None: + document.Close(False) + word.Quit() + except Exception: + return 0 + + +def _count_pages_with_powershell_word(file_path: Path) -> int: + script = r""" +param([string]$Path) +$word = $null +$doc = $null +try { + $word = New-Object -ComObject Word.Application + $word.Visible = $false + $doc = $word.Documents.Open($Path, $false, $true) + $doc.Repaginate() + [Console]::Out.Write($doc.ComputeStatistics(2)) + exit 0 +} catch { + [Console]::Error.Write($_.Exception.Message) + exit 1 +} finally { + if ($doc -ne $null) { $doc.Close($false) | Out-Null } + if ($word -ne $null) { $word.Quit() | Out-Null } +} +""" + try: + completed = subprocess.run( + ["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", script, str(file_path.resolve())], + capture_output=True, + check=False, + text=True, + timeout=8, + ) + except Exception: + return 0 + if completed.returncode != 0: + return 0 + try: + return max(int(completed.stdout.strip()), 1) + except ValueError: + return 0 diff --git a/review_agent/regulatory_info_package/services/template_config.py b/review_agent/regulatory_info_package/services/template_config.py index e700859..42475f9 100644 --- a/review_agent/regulatory_info_package/services/template_config.py +++ b/review_agent/regulatory_info_package/services/template_config.py @@ -32,8 +32,8 @@ def validate_template_config(config: dict) -> list[str]: if not source_dir.exists(): errors.append(f"模板源目录不存在:{source_dir}") templates = config.get("templates") or [] - if len(templates) != 7: - errors.append("第1章监管信息模板配置必须包含 7 个模板。") + if len(templates) != 6: + errors.append("第1章监管信息模板配置必须包含 6 个模板。") seen: set[str] = set() for template in templates: code = str(template.get("code") or "") @@ -51,4 +51,3 @@ def validate_template_config(config: dict) -> list[str]: if not output_name: errors.append(f"模板 {code} 缺少 output_name。") return errors - diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx index c92ea89..dc874a5 100644 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx and b/review_agent/regulatory_info_package/templates/clean/CH1.11.1 符合标准的清单.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx index 332f518..4fac204 100644 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx and b/review_agent/regulatory_info_package/templates/clean/CH1.11.5 真实性声明.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx b/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx index 59d05cd..2b29f3f 100644 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx and b/review_agent/regulatory_info_package/templates/clean/CH1.11.6 符合性声明.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录 - 页码版.docx b/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录 - 页码版.docx new file mode 100644 index 0000000..4e8c239 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录 - 页码版.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx b/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx deleted file mode 100644 index 62e4e8f..0000000 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.2 监管信息目录.docx and /dev/null differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表 - 复选框调整版.docx b/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表 - 复选框调整版.docx new file mode 100644 index 0000000..565a9b0 Binary files /dev/null and b/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表 - 复选框调整版.docx differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx b/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx deleted file mode 100644 index 42e962e..0000000 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.4 申请表.docx and /dev/null differ diff --git a/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx b/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx index 8f59550..7b08002 100644 Binary files a/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx and b/review_agent/regulatory_info_package/templates/clean/CH1.5 产品列表.docx differ diff --git a/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml index c8790de..275a1a2 100644 --- a/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml +++ b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml @@ -3,14 +3,14 @@ source_dir: review_agent/regulatory_info_package/templates/clean zip_name: 第1章 监管信息(预生成版).zip templates: - code: ch1_2_directory - source_file: CH1.2 监管信息目录.docx + source_file: CH1.2 监管信息目录 - 页码版.docx output_name: CH1.2 监管信息目录.docx file_format: docx strategy: directory include_in_zip: true fields: [] - code: ch1_4_application_form - source_file: CH1.4 申请表.docx + source_file: CH1.4 申请表 - 复选框调整版.docx output_name: CH1.4 申请表.docx file_format: docx strategy: application_form @@ -32,16 +32,6 @@ templates: - key: package_specification label: 包装规格 placeholder: "{{package_specification}}" - - code: ch1_9_pre_submission - source_file: CH1.9 产品申报前沟通的说明.docx - output_name: CH1.9 产品申报前沟通的说明.docx - file_format: docx - strategy: pre_submission - include_in_zip: true - fields: - - key: product_name - label: 产品名称 - placeholder: "{{product_name}}" - code: ch1_11_1_standards source_file: CH1.11.1 符合标准的清单.docx output_name: CH1.11.1 符合标准的清单.docx