78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from docx import Document
|
|
|
|
from review_agent.regulatory_info_package.schemas import InstructionExtractResult
|
|
|
|
|
|
def parse_instruction_docx(path: str | Path) -> InstructionExtractResult:
|
|
file_path = Path(path)
|
|
document = Document(file_path)
|
|
paragraphs = [paragraph.text.strip() for paragraph in document.paragraphs if paragraph.text.strip()]
|
|
tables = []
|
|
for table in document.tables:
|
|
rows = []
|
|
for row in table.rows:
|
|
rows.append([" ".join(cell.text.split()) for cell in row.cells])
|
|
if rows:
|
|
tables.append(rows)
|
|
sections = _build_sections(paragraphs)
|
|
front_text = "\n".join(paragraphs[:30])
|
|
return InstructionExtractResult(
|
|
source_file_name=file_path.name,
|
|
paragraphs=paragraphs,
|
|
sections=sections,
|
|
tables=tables,
|
|
component_tables=_component_tables(tables),
|
|
front_text=front_text,
|
|
)
|
|
|
|
|
|
def save_instruction_extract_json(path: str | Path, result: InstructionExtractResult) -> Path:
|
|
target = Path(path)
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"source_file_name": result.source_file_name,
|
|
"paragraphs": result.paragraphs,
|
|
"sections": result.sections,
|
|
"tables": result.tables,
|
|
"component_tables": result.component_tables,
|
|
"front_text": result.front_text,
|
|
}
|
|
target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
return target
|
|
|
|
|
|
def _build_sections(paragraphs: list[str]) -> dict[str, str]:
|
|
sections: dict[str, list[str]] = {}
|
|
current = "front"
|
|
for text in paragraphs:
|
|
if _looks_like_heading(text):
|
|
current = text[:80]
|
|
sections.setdefault(current, [])
|
|
continue
|
|
sections.setdefault(current, []).append(text)
|
|
return {key: "\n".join(value).strip() for key, value in sections.items() if value}
|
|
|
|
|
|
def _looks_like_heading(text: str) -> bool:
|
|
compact = text.strip()
|
|
if len(compact) > 40:
|
|
return False
|
|
heading_markers = ("一、", "二、", "三、", "四、", "五、", "六、", "【", "产品名称", "预期用途", "主要组成")
|
|
return compact.startswith(heading_markers)
|
|
|
|
|
|
def _component_tables(tables: list[list[list[str]]]) -> list[dict]:
|
|
results = []
|
|
for table in tables:
|
|
header = table[0] if table else []
|
|
joined = "".join(header)
|
|
if any(keyword in joined for keyword in ["组成", "组分", "成分"]):
|
|
results.append({"header": header, "rows": table[1:]})
|
|
return results
|
|
|