diff --git a/review_agent/application_form_fill/services/field_extract.py b/review_agent/application_form_fill/services/field_extract.py index 4c72f10..6f40833 100644 --- a/review_agent/application_form_fill/services/field_extract.py +++ b/review_agent/application_form_fill/services/field_extract.py @@ -15,6 +15,15 @@ from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFill from review_agent.regulatory_review.services.text_extract import extract_text +FIELD_ALIASES = { + "product_name": ["产品名称"], + "package_specification": ["包装规格", "规格"], + "main_components": ["主要组成成分", "主要组成", "组成成分"], + "intended_use": ["预期用途"], + "storage_condition_and_validity": ["产品储存条件及有效期", "储存条件及有效期", "储存条件", "有效期"], +} + + def collect_document_texts(summary_batch: FileSummaryBatch) -> dict[str, str]: texts: dict[str, str] = {} for item in summary_batch.items.order_by("file_index"): @@ -36,7 +45,7 @@ def extract_by_rules(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[s for file_name, text in texts.items(): source_role = detect_source_role(file_name, text) for field in field_defs: - value, evidence = _extract_label_value(text, field["label"], labels) + value, evidence = _extract_field_value(text, field, labels) if not value: continue fields.append( @@ -142,7 +151,34 @@ def _field_defs(specs: list[TemplateSpec]) -> list[dict[str, str]]: return fields +def _extract_field_value(text: str, field: dict[str, str], labels: list[str]) -> tuple[str, str]: + aliases = _field_aliases(field) + for label in aliases: + value, evidence = _extract_colon_label_value(text, label, labels + aliases) + if value: + return value, evidence + value, evidence = _extract_bracket_section_value(text, label) + if value: + return value, evidence + return "", "" + + +def _field_aliases(field: dict[str, str]) -> list[str]: + aliases = [field["label"]] + aliases.extend(FIELD_ALIASES.get(field["key"], [])) + result: list[str] = [] + for alias in aliases: + normalized = str(alias or "").strip() + if normalized and normalized not in result: + result.append(normalized) + return result + + def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]: + return _extract_colon_label_value(text, label, labels) + + +def _extract_colon_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]: escaped_labels = "|".join(re.escape(item) for item in labels if item != label) stop_pattern = rf"(?=\n\s*(?:{escaped_labels})\s*[::])" if escaped_labels else r"(?=\Z)" pattern = re.compile(rf"{re.escape(label)}\s*[::]\s*(.+?)(?:{stop_pattern}|\Z)", re.S) @@ -156,6 +192,30 @@ def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str, return value, evidence +def _extract_bracket_section_value(text: str, label: str) -> tuple[str, str]: + heading_pattern = rf"^\s*[【\[]\s*{re.escape(label)}\s*[】\]]\s*$" + lines = (text or "").splitlines() + for index, line in enumerate(lines): + if not re.match(heading_pattern, line.strip()): + continue + value_parts: list[str] = [] + for next_line in lines[index + 1 :]: + normalized = next_line.strip() + if not normalized: + continue + if _looks_like_bracket_heading(normalized): + break + value_parts.append(normalized) + value = "\n".join(value_parts).strip() + if value: + return value, f"【{label}】\n{value}"[:300] + return "", "" + + +def _looks_like_bracket_heading(line: str) -> bool: + return bool(re.match(r"^\s*[【\[].{1,40}[】\]]\s*$", line)) + + def _prompt_text() -> str: path = Path(__file__).resolve().parents[1] / "prompts" / "field_extract.md" return path.read_text(encoding="utf-8") diff --git a/review_agent/regulatory_review/services/rag_index.py b/review_agent/regulatory_review/services/rag_index.py index b6a9d5a..c806e08 100644 --- a/review_agent/regulatory_review/services/rag_index.py +++ b/review_agent/regulatory_review/services/rag_index.py @@ -9,6 +9,10 @@ from pathlib import Path from django.conf import settings from docx import Document +from docx.oxml.table import CT_Tbl +from docx.oxml.text.paragraph import CT_P +from docx.table import Table +from docx.text.paragraph import Paragraph from openpyxl import load_workbook from pypdf import PdfReader from pptx import Presentation @@ -49,7 +53,7 @@ def extract_text_from_path(path: Path) -> str: if suffix == ".pdf": return "\n".join(page.extract_text() or "" for page in PdfReader(str(path)).pages) if suffix == ".docx": - return "\n".join(paragraph.text for paragraph in Document(str(path)).paragraphs) + return _extract_docx_text(path) if suffix == ".pptx": presentation = Presentation(str(path)) lines = [] @@ -72,6 +76,31 @@ def extract_text_from_path(path: Path) -> str: return "" +def _extract_docx_text(path: Path) -> str: + document = Document(str(path)) + lines: list[str] = [] + for block in _iter_docx_blocks(document): + if isinstance(block, Paragraph): + text = block.text.strip() + if text: + lines.append(text) + elif isinstance(block, Table): + for row in block.rows: + values = [cell.text.strip() for cell in row.cells if cell.text.strip()] + if values: + lines.append("\t".join(values)) + return "\n".join(lines) + + +def _iter_docx_blocks(document): + body = document.element.body + for child in body.iterchildren(): + if isinstance(child, CT_P): + yield Paragraph(child, document) + elif isinstance(child, CT_Tbl): + yield Table(child, document) + + def _extract_legacy_doc_with_libreoffice(path: Path) -> str: with tempfile.TemporaryDirectory() as tmp_dir: target_dir = Path(tmp_dir) diff --git a/tests/test_application_form_fill_field_extract.py b/tests/test_application_form_fill_field_extract.py index 08c7b44..2ceea79 100644 --- a/tests/test_application_form_fill_field_extract.py +++ b/tests/test_application_form_fill_field_extract.py @@ -48,6 +48,42 @@ def test_rule_extracts_registration_certificate_fields(): assert values["package_specification"]["extractor"] == "rule" +def test_rule_extracts_bracket_sections_from_instructions(): + texts = { + "目标产品说明书.docx": "\n".join( + [ + "【产品名称】", + "新型冠状病毒2019-nCoV核酸检测试剂盒(荧光PCR法)", + "【包装规格】", + "规格A:24人份/盒、48人份/盒、96人份/盒。", + "规格B:24人份/盒、48人份/盒、96人份/盒。", + "【预期用途】", + "本试剂盒用于体外定性检测咽拭子、痰液样本中新型冠状病毒(2019-nCoV)ORF1ab和N基因。", + "【检测原理】", + "本段不应进入预期用途。", + "【主要组成成分】", + "表1 规格A大包装试剂盒组成成分", + "组分\t规格\t数量", + "PCR反应液\t24人份/盒\t1管", + "【储存条件及有效期】", + "-20±5℃的避光条件,有效期12个月。", + "反复冻融次数不得超过4次。", + "【样本要求】", + "适用样本类型:咽拭子、痰液。", + ] + ) + } + + result = extract_by_rules(texts, _registration_specs()) + + values = {field["key"]: field["value"] for field in result["fields"]} + assert values["product_name"] == "新型冠状病毒2019-nCoV核酸检测试剂盒(荧光PCR法)" + assert "规格A" in values["package_specification"] + assert "检测原理" not in values["intended_use"] + assert "PCR反应液" in values["main_components"] + assert "-20±5℃" in values["storage_condition_and_validity"] + + def test_llm_extract_parses_structured_json(monkeypatch): monkeypatch.setattr( "review_agent.application_form_fill.services.field_extract.generate_completion", diff --git a/tests/test_regulatory_text_extract.py b/tests/test_regulatory_text_extract.py index 4979bf6..a9effe0 100644 --- a/tests/test_regulatory_text_extract.py +++ b/tests/test_regulatory_text_extract.py @@ -37,3 +37,25 @@ def test_extract_text_reports_unsupported_file(tmp_path): assert result.status == "unsupported" assert result.text == "" + + +def test_extract_text_from_docx_preserves_table_text(tmp_path): + from docx import Document + + path = tmp_path / "说明书.docx" + document = Document() + document.add_paragraph("【主要组成成分】") + table = document.add_table(rows=2, cols=2) + table.rows[0].cells[0].text = "组分" + table.rows[0].cells[1].text = "数量" + table.rows[1].cells[0].text = "PCR反应液" + table.rows[1].cells[1].text = "1管" + document.add_paragraph("【储存条件及有效期】") + document.add_paragraph("-20±5℃保存,有效期12个月。") + document.save(path) + + result = extract_text(path) + + assert result.status == "success" + assert "组分\t数量" in result.text + assert result.text.index("PCR反应液") < result.text.index("【储存条件及有效期】")