from __future__ import annotations import json from pathlib import Path from docx import Document from review_agent.regulatory_info_package.schemas import InstructionExtractResult def parse_instruction_docx(path: str | Path) -> InstructionExtractResult: file_path = Path(path) document = Document(file_path) paragraphs = [paragraph.text.strip() for paragraph in document.paragraphs if paragraph.text.strip()] tables = [] for table in document.tables: rows = [] for row in table.rows: rows.append([" ".join(cell.text.split()) for cell in row.cells]) if rows: tables.append(rows) sections = _build_sections(paragraphs) front_text = "\n".join(paragraphs[:30]) return InstructionExtractResult( source_file_name=file_path.name, paragraphs=paragraphs, sections=sections, tables=tables, component_tables=_component_tables(tables), front_text=front_text, ) def save_instruction_extract_json(path: str | Path, result: InstructionExtractResult) -> Path: target = Path(path) target.parent.mkdir(parents=True, exist_ok=True) payload = { "source_file_name": result.source_file_name, "paragraphs": result.paragraphs, "sections": result.sections, "tables": result.tables, "component_tables": result.component_tables, "front_text": result.front_text, } target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return target def _build_sections(paragraphs: list[str]) -> dict[str, str]: sections: dict[str, list[str]] = {} current = "front" for text in paragraphs: if _looks_like_heading(text): current = text[:80] sections.setdefault(current, []) continue sections.setdefault(current, []).append(text) return {key: "\n".join(value).strip() for key, value in sections.items() if value} def _looks_like_heading(text: str) -> bool: compact = text.strip() if len(compact) > 40: return False heading_markers = ("一、", "二、", "三、", "四、", "五、", "六、", "【", "产品名称", "预期用途", "主要组成") return compact.startswith(heading_markers) def _component_tables(tables: list[list[list[str]]]) -> list[dict]: results = [] for table in tables: header = table[0] if table else [] joined = "".join(header) if any(keyword in joined for keyword in ["组成", "组分", "成分"]): results.append({"header": header, "rows": table[1:]}) return results