fix(application-form-fill): 抽取说明书章节和表格字段

This commit is contained in:
2026-06-07 20:14:53 +08:00
parent 13b543c99d
commit 0ccd69d3f4
4 changed files with 149 additions and 2 deletions

View File

@@ -15,6 +15,15 @@ from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFill
from review_agent.regulatory_review.services.text_extract import extract_text
FIELD_ALIASES = {
"product_name": ["产品名称"],
"package_specification": ["包装规格", "规格"],
"main_components": ["主要组成成分", "主要组成", "组成成分"],
"intended_use": ["预期用途"],
"storage_condition_and_validity": ["产品储存条件及有效期", "储存条件及有效期", "储存条件", "有效期"],
}
def collect_document_texts(summary_batch: FileSummaryBatch) -> dict[str, str]:
texts: dict[str, str] = {}
for item in summary_batch.items.order_by("file_index"):
@@ -36,7 +45,7 @@ def extract_by_rules(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[s
for file_name, text in texts.items():
source_role = detect_source_role(file_name, text)
for field in field_defs:
value, evidence = _extract_label_value(text, field["label"], labels)
value, evidence = _extract_field_value(text, field, labels)
if not value:
continue
fields.append(
@@ -142,7 +151,34 @@ def _field_defs(specs: list[TemplateSpec]) -> list[dict[str, str]]:
return fields
def _extract_field_value(text: str, field: dict[str, str], labels: list[str]) -> tuple[str, str]:
aliases = _field_aliases(field)
for label in aliases:
value, evidence = _extract_colon_label_value(text, label, labels + aliases)
if value:
return value, evidence
value, evidence = _extract_bracket_section_value(text, label)
if value:
return value, evidence
return "", ""
def _field_aliases(field: dict[str, str]) -> list[str]:
aliases = [field["label"]]
aliases.extend(FIELD_ALIASES.get(field["key"], []))
result: list[str] = []
for alias in aliases:
normalized = str(alias or "").strip()
if normalized and normalized not in result:
result.append(normalized)
return result
def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]:
return _extract_colon_label_value(text, label, labels)
def _extract_colon_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]:
escaped_labels = "|".join(re.escape(item) for item in labels if item != label)
stop_pattern = rf"(?=\n\s*(?:{escaped_labels})\s*[:])" if escaped_labels else r"(?=\Z)"
pattern = re.compile(rf"{re.escape(label)}\s*[:]\s*(.+?)(?:{stop_pattern}|\Z)", re.S)
@@ -156,6 +192,30 @@ def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str,
return value, evidence
def _extract_bracket_section_value(text: str, label: str) -> tuple[str, str]:
heading_pattern = rf"^\s*[【\[]\s*{re.escape(label)}\s*[】\]]\s*$"
lines = (text or "").splitlines()
for index, line in enumerate(lines):
if not re.match(heading_pattern, line.strip()):
continue
value_parts: list[str] = []
for next_line in lines[index + 1 :]:
normalized = next_line.strip()
if not normalized:
continue
if _looks_like_bracket_heading(normalized):
break
value_parts.append(normalized)
value = "\n".join(value_parts).strip()
if value:
return value, f"{label}\n{value}"[:300]
return "", ""
def _looks_like_bracket_heading(line: str) -> bool:
return bool(re.match(r"^\s*[【\[].{1,40}[】\]]\s*$", line))
def _prompt_text() -> str:
path = Path(__file__).resolve().parents[1] / "prompts" / "field_extract.md"
return path.read_text(encoding="utf-8")

View File

@@ -9,6 +9,10 @@ from pathlib import Path
from django.conf import settings
from docx import Document
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table
from docx.text.paragraph import Paragraph
from openpyxl import load_workbook
from pypdf import PdfReader
from pptx import Presentation
@@ -49,7 +53,7 @@ def extract_text_from_path(path: Path) -> str:
if suffix == ".pdf":
return "\n".join(page.extract_text() or "" for page in PdfReader(str(path)).pages)
if suffix == ".docx":
return "\n".join(paragraph.text for paragraph in Document(str(path)).paragraphs)
return _extract_docx_text(path)
if suffix == ".pptx":
presentation = Presentation(str(path))
lines = []
@@ -72,6 +76,31 @@ def extract_text_from_path(path: Path) -> str:
return ""
def _extract_docx_text(path: Path) -> str:
document = Document(str(path))
lines: list[str] = []
for block in _iter_docx_blocks(document):
if isinstance(block, Paragraph):
text = block.text.strip()
if text:
lines.append(text)
elif isinstance(block, Table):
for row in block.rows:
values = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if values:
lines.append("\t".join(values))
return "\n".join(lines)
def _iter_docx_blocks(document):
body = document.element.body
for child in body.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, document)
elif isinstance(child, CT_Tbl):
yield Table(child, document)
def _extract_legacy_doc_with_libreoffice(path: Path) -> str:
with tempfile.TemporaryDirectory() as tmp_dir:
target_dir = Path(tmp_dir)