From dac8ce3c1447b229448c747624934840e23120c6 Mon Sep 17 00:00:00 2001 From: bruce Date: Wed, 10 Jun 2026 19:49:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(regulatory-info-package):=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=9D=90=E6=96=99=E5=8C=85=E7=94=9F=E6=88=90=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../regulatory_info_package/__init__.py | 2 + .../regulatory_info_package/constants.py | 30 ++ .../regulatory_info_package/events.py | 15 + .../regulatory_info_package/schemas.py | 58 +++ .../services/__init__.py | 2 + .../services/docx_document.py | 70 ++++ .../services/field_extract.py | 135 +++++++ .../services/field_merge.py | 115 ++++++ .../services/input_select.py | 105 ++++++ .../services/instruction_extract.py | 77 ++++ .../services/legacy_doc_document.py | 52 +++ .../services/package_generate.py | 65 ++++ .../services/summary.py | 12 + .../services/template_config.py | 54 +++ .../services/template_repository.py | 34 ++ .../services/traceability_export.py | 51 +++ .../services/zip_export.py | 23 ++ .../regulatory_info_package/storage.py | 71 ++++ .../regulatory_info_package_templates_v1.yaml | 76 ++++ review_agent/regulatory_info_package/views.py | 127 +++++++ .../regulatory_info_package/workflow.py | 338 ++++++++++++++++++ 21 files changed, 1512 insertions(+) create mode 100644 review_agent/regulatory_info_package/__init__.py create mode 100644 review_agent/regulatory_info_package/constants.py create mode 100644 review_agent/regulatory_info_package/events.py create mode 100644 review_agent/regulatory_info_package/schemas.py create mode 100644 review_agent/regulatory_info_package/services/__init__.py create mode 100644 review_agent/regulatory_info_package/services/docx_document.py create mode 100644 review_agent/regulatory_info_package/services/field_extract.py create mode 100644 review_agent/regulatory_info_package/services/field_merge.py create mode 100644 review_agent/regulatory_info_package/services/input_select.py create mode 100644 review_agent/regulatory_info_package/services/instruction_extract.py create mode 100644 review_agent/regulatory_info_package/services/legacy_doc_document.py create mode 100644 review_agent/regulatory_info_package/services/package_generate.py create mode 100644 review_agent/regulatory_info_package/services/summary.py create mode 100644 review_agent/regulatory_info_package/services/template_config.py create mode 100644 review_agent/regulatory_info_package/services/template_repository.py create mode 100644 review_agent/regulatory_info_package/services/traceability_export.py create mode 100644 review_agent/regulatory_info_package/services/zip_export.py create mode 100644 review_agent/regulatory_info_package/storage.py create mode 100644 review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml create mode 100644 review_agent/regulatory_info_package/views.py create mode 100644 review_agent/regulatory_info_package/workflow.py diff --git a/review_agent/regulatory_info_package/__init__.py b/review_agent/regulatory_info_package/__init__.py new file mode 100644 index 0000000..3026f19 --- /dev/null +++ b/review_agent/regulatory_info_package/__init__.py @@ -0,0 +1,2 @@ +"""Chapter 1 regulatory information package workflow.""" + diff --git a/review_agent/regulatory_info_package/constants.py b/review_agent/regulatory_info_package/constants.py new file mode 100644 index 0000000..adaf007 --- /dev/null +++ b/review_agent/regulatory_info_package/constants.py @@ -0,0 +1,30 @@ +WORKFLOW_TYPE = "regulatory_info_package" +DEFAULT_ZIP_NAME = "第1章 监管信息(预生成版).zip" + +REGULATORY_INFO_PACKAGE_TRIGGER_KEYWORDS = [ + "根据说明书生成第1章监管信息", + "生成监管信息材料包", + "从说明书生成第1章材料", + "第1章监管信息", + "监管信息材料包", +] + +REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS = [ + ("prepare", "准备资料", "regulatory_info_package"), + ("template_copy", "复制模板", "regulatory_info_package"), + ("text_extract", "抽取说明书", "regulatory_info_package"), + ("field_extract", "抽取字段", "regulatory_info_package"), + ("field_merge", "合并字段", "regulatory_info_package"), + ("generate_docs", "生成材料", "regulatory_info_package"), + ("highlight_review_items", "标记待确认", "regulatory_info_package"), + ("trace_export", "追溯清单", "regulatory_info_package"), + ("zip_export", "打包下载", "regulatory_info_package"), + ("notify", "通知", "regulatory_info_package"), + ("completed", "完成", "completed"), +] + +GENERATED_FILE_SUCCESS = "success" +GENERATED_FILE_FALLBACK_SUCCESS = "fallback_success" +GENERATED_FILE_FAILED = "failed" +GENERATED_FILE_SKIPPED = "skipped" + diff --git a/review_agent/regulatory_info_package/events.py b/review_agent/regulatory_info_package/events.py new file mode 100644 index 0000000..7d12e93 --- /dev/null +++ b/review_agent/regulatory_info_package/events.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from review_agent.regulatory_info_package.constants import WORKFLOW_TYPE +from review_agent.models import RegulatoryInfoPackageBatch, WorkflowEvent + + +def record_event(batch: RegulatoryInfoPackageBatch, event_type: str, payload: dict | None = None) -> WorkflowEvent: + return WorkflowEvent.objects.create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + conversation=batch.conversation, + event_type=event_type, + payload=payload or {}, + ) + diff --git a/review_agent/regulatory_info_package/schemas.py b/review_agent/regulatory_info_package/schemas.py new file mode 100644 index 0000000..2f61dd2 --- /dev/null +++ b/review_agent/regulatory_info_package/schemas.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class TemplateSpec: + code: str + output_name: str + source_file: str + file_format: str + strategy: str + include_in_zip: bool + prefer_legacy_doc_native: bool = False + allow_docx_fallback: bool = True + fields: list[dict[str, Any]] = field(default_factory=list) + + +@dataclass +class InstructionExtractResult: + source_file_name: str + paragraphs: list[str] + sections: dict[str, str] + tables: list[list[list[str]]] + component_tables: list[dict[str, Any]] + front_text: str + + +@dataclass +class MergedField: + key: str + label: str + value: str + source: str + evidence: str + confidence: float + highlight_reason: str = "none" + needs_review: bool = False + rule_value: str = "" + llm_value: str = "" + + +@dataclass +class GeneratedFileResult: + template_code: str + file_name: str + requested_format: str + actual_format: str + status: str + path: str = "" + artifact_id: int | None = None + export_id: int | None = None + highlight_count: int = 0 + missing_count: int = 0 + llm_only_count: int = 0 + error_message: str = "" + diff --git a/review_agent/regulatory_info_package/services/__init__.py b/review_agent/regulatory_info_package/services/__init__.py new file mode 100644 index 0000000..0f7ff23 --- /dev/null +++ b/review_agent/regulatory_info_package/services/__init__.py @@ -0,0 +1,2 @@ +"""Services for the regulatory information package workflow.""" + diff --git a/review_agent/regulatory_info_package/services/docx_document.py b/review_agent/regulatory_info_package/services/docx_document.py new file mode 100644 index 0000000..eebdc0d --- /dev/null +++ b/review_agent/regulatory_info_package/services/docx_document.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from pathlib import Path + +from docx import Document +from docx.enum.text import WD_COLOR_INDEX +from docx.shared import RGBColor + +from review_agent.regulatory_info_package.schemas import MergedField + + +def write_docx_from_template( + source_path: str | Path, + output_path: str | Path, + merged_fields: dict[str, MergedField], +) -> tuple[int, int, int]: + source = Path(source_path) + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + if source.exists(): + document = Document(source) + else: + document = Document() + replacements = {f"{{{{{key}}}}}": field for key, field in merged_fields.items()} + highlight_count = 0 + missing_count = 0 + llm_only_count = 0 + for paragraph in document.paragraphs: + for placeholder, field in replacements.items(): + if placeholder in paragraph.text: + _replace_paragraph_text(paragraph, paragraph.text.replace(placeholder, field.value), field) + if field.highlight_reason != "none": + highlight_count += 1 + if field.highlight_reason == "missing": + missing_count += 1 + if field.highlight_reason == "llm_only": + llm_only_count += 1 + document.add_page_break() + heading = document.add_paragraph() + heading_run = heading.add_run("预生成字段") + heading_run.bold = True + table = document.add_table(rows=1, cols=4) + table.rows[0].cells[0].text = "字段" + table.rows[0].cells[1].text = "值" + table.rows[0].cells[2].text = "来源" + table.rows[0].cells[3].text = "待确认" + for field in merged_fields.values(): + cells = table.add_row().cells + cells[0].text = field.label + cells[1].text = field.value + cells[2].text = field.source + cells[3].text = "是" if field.needs_review else "否" + if field.highlight_reason != "none": + highlight_count += 1 + if field.highlight_reason == "missing": + missing_count += 1 + if field.highlight_reason == "llm_only": + llm_only_count += 1 + document.save(output) + return highlight_count, missing_count, llm_only_count + + +def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None: + for run in paragraph.runs: + run.text = "" + run = paragraph.add_run(text) + if field.highlight_reason != "none": + run.font.highlight_color = WD_COLOR_INDEX.YELLOW + if field.highlight_reason == "conflict": + run.font.color.rgb = RGBColor(255, 0, 0) diff --git a/review_agent/regulatory_info_package/services/field_extract.py b/review_agent/regulatory_info_package/services/field_extract.py new file mode 100644 index 0000000..4f0eb65 --- /dev/null +++ b/review_agent/regulatory_info_package/services/field_extract.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import json +import re +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Callable + +from review_agent.llm import generate_completion +from review_agent.regulatory_info_package.schemas import InstructionExtractResult + + +FIELD_PATTERNS = { + "product_name": ("产品名称", r"产品名称[::\s]*([^\n\r]+)"), + "storage_condition": ("储存条件", r"(?:储存条件|贮存条件|保存条件)[::\s]*([^\n\r]+)"), + "intended_use": ("预期用途", r"预期用途[::\s]*([^\n\r]+)"), + "package_specification": ("包装规格", r"(?:包装规格|规格)[::\s]*([^\n\r]+)"), + "sample_type": ("样本类型", r"样本类型[::\s]*([^\n\r]+)"), + "applicable_instrument": ("适用仪器", r"适用仪器[::\s]*([^\n\r]+)"), + "standard_no": ("标准号", r"((?:GB|YY|WS|T/C[A-Z0-9]*)[ /T0-9.\-—]+)"), +} + + +def extract_fields_by_rules(instruction: InstructionExtractResult) -> dict[str, dict]: + text = "\n".join([instruction.front_text, *instruction.paragraphs, *instruction.sections.values()]) + results: dict[str, dict] = {} + for key, (label, pattern) in FIELD_PATTERNS.items(): + section_value = _value_after_label_paragraph(instruction.paragraphs, label) + if section_value: + results[key] = { + "label": label, + "value": section_value, + "evidence": f"【{label}】\n{section_value}", + "confidence": 0.82, + "source": "rule", + } + continue + match = re.search(pattern, text, flags=re.IGNORECASE) + if match: + value = _clean_value(match.group(1)) + if value: + results[key] = { + "label": label, + "value": value, + "evidence": match.group(0)[:240], + "confidence": 0.75, + "source": "rule", + } + return results + + +def extract_fields_with_llm(instruction: InstructionExtractResult) -> dict[str, dict]: + prompt = ( + "请从体外诊断试剂产品说明书中抽取字段,输出 JSON 对象,字段包括 " + "product_name、storage_condition、intended_use、package_specification、sample_type、applicable_instrument、standard_no。" + "每个字段值为 {label,value,evidence,confidence}。\n\n" + + instruction.front_text[:6000] + ) + raw = generate_completion([{"role": "user", "content": prompt}], temperature=0.0) + payload = _parse_json_object(raw) + return {key: value for key, value in payload.items() if isinstance(value, dict)} + + +def run_llm_extract_with_retry( + instruction: InstructionExtractResult, + *, + llm_extract_func: Callable[[InstructionExtractResult], dict[str, dict]] | None = None, + sleep_func: Callable[[float], None] = time.sleep, +) -> dict[str, dict]: + func = llm_extract_func or extract_fields_with_llm + last_exc: Exception | None = None + for delay in [0, 1, 2]: + if delay: + sleep_func(delay) + try: + return func(instruction) + except Exception as exc: + last_exc = exc + if last_exc: + raise last_exc + return {} + + +def run_parallel_extract( + instruction: InstructionExtractResult, + *, + llm_extract_func: Callable[[InstructionExtractResult], dict[str, dict]] | None = None, +) -> dict: + payload = {"regex_results": {}, "llm_results": {}, "llm_error": ""} + with ThreadPoolExecutor(max_workers=2) as executor: + rule_future = executor.submit(extract_fields_by_rules, instruction) + llm_future = executor.submit(run_llm_extract_with_retry, instruction, llm_extract_func=llm_extract_func) + payload["regex_results"] = rule_future.result() + try: + payload["llm_results"] = llm_future.result() + except Exception as exc: + payload["llm_error"] = str(exc) + return payload + + +def save_field_extract_result(path: str | Path, payload: dict) -> Path: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return target + + +def _clean_value(value: str) -> str: + cleaned = value.strip() + if cleaned in {"】", "】】", "】:"}: + return "" + return re.split(r"[。;;]", cleaned)[0].strip() + + +def _value_after_label_paragraph(paragraphs: list[str], label: str) -> str: + bracketed = {f"【{label}】", f"[{label}]", label} + for index, text in enumerate(paragraphs): + stripped = text.strip() + if stripped in bracketed and index + 1 < len(paragraphs): + return _clean_value(paragraphs[index + 1]) + return "" + + +def _parse_json_object(raw: str) -> dict: + text = (raw or "").strip() + if text.startswith("```"): + text = text.strip("`").strip() + if text.lower().startswith("json"): + text = text[4:].strip() + start = text.find("{") + end = text.rfind("}") + if start == -1 or end == -1: + return {} + return json.loads(text[start : end + 1]) diff --git a/review_agent/regulatory_info_package/services/field_merge.py b/review_agent/regulatory_info_package/services/field_merge.py new file mode 100644 index 0000000..5e9aff7 --- /dev/null +++ b/review_agent/regulatory_info_package/services/field_merge.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from review_agent.regulatory_info_package.schemas import MergedField + + +REQUIRED_FIELDS = { + "product_name": "产品名称", + "applicant_name": "申请人名称", + "package_specification": "包装规格", + "intended_use": "预期用途", + "storage_condition": "储存条件", +} + + +def merge_fields(rule_results: dict[str, dict], llm_results: dict[str, dict]) -> tuple[dict[str, MergedField], dict[str, list[dict]]]: + merged: dict[str, MergedField] = {} + missing_fields: list[dict] = [] + llm_only_fields: list[dict] = [] + conflict_fields: list[dict] = [] + keys = set(REQUIRED_FIELDS) | set(rule_results) | set(llm_results) + for key in sorted(keys): + rule = rule_results.get(key) or {} + llm = llm_results.get(key) or {} + rule_value = str(rule.get("value") or "").strip() + llm_value = str(llm.get("value") or "").strip() + label = str(rule.get("label") or llm.get("label") or REQUIRED_FIELDS.get(key) or key) + if rule_value and llm_value and rule_value != llm_value: + field = MergedField( + key=key, + label=label, + value=rule_value, + source="rule_conflict", + evidence=str(rule.get("evidence") or ""), + confidence=float(rule.get("confidence") or 0.0), + highlight_reason="conflict", + needs_review=True, + rule_value=rule_value, + llm_value=llm_value, + ) + conflict_fields.append( + { + "field_key": key, + "field_label": label, + "rule_value": rule_value, + "llm_value": llm_value, + "selected_value": rule_value, + "handling": "规则优先,写入值高亮并进入追溯清单", + } + ) + elif rule_value: + field = MergedField( + key=key, + label=label, + value=rule_value, + source="rule", + evidence=str(rule.get("evidence") or ""), + confidence=float(rule.get("confidence") or 0.0), + ) + elif llm_value: + field = MergedField( + key=key, + label=label, + value=llm_value, + source="llm", + evidence=str(llm.get("evidence") or ""), + confidence=float(llm.get("confidence") or 0.0), + highlight_reason="llm_only", + needs_review=True, + llm_value=llm_value, + ) + llm_only_fields.append(_review_dict(field)) + else: + field = MergedField( + key=key, + label=label, + value="/", + source="missing", + evidence="", + confidence=0.0, + highlight_reason="missing", + needs_review=True, + ) + missing_fields.append(_review_dict(field)) + merged[key] = field + return merged, { + "missing_fields": missing_fields, + "llm_only_fields": llm_only_fields, + "conflict_fields": conflict_fields, + } + + +def save_merged_fields(path: str | Path, merged: dict[str, MergedField], summary: dict[str, list[dict]]) -> Path: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + payload = { + "fields": {key: field.__dict__ for key, field in merged.items()}, + **summary, + } + target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return target + + +def _review_dict(field: MergedField) -> dict: + return { + "target_file": "", + "field_key": field.key, + "field_label": field.label, + "final_value": field.value, + "highlight_reason": field.highlight_reason, + "needs_review": field.needs_review, + } + diff --git a/review_agent/regulatory_info_package/services/input_select.py b/review_agent/regulatory_info_package/services/input_select.py new file mode 100644 index 0000000..a269ab4 --- /dev/null +++ b/review_agent/regulatory_info_package/services/input_select.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + +from review_agent.models import Conversation, FileAttachment, FileSummaryBatch, FileSummaryItem + + +@dataclass +class InstructionInputSelection: + status: str + file_name: str = "" + storage_path: str = "" + attachment: FileAttachment | None = None + source_summary_batch: FileSummaryBatch | None = None + source_summary_item_id: int | None = None + candidates: list[str] = field(default_factory=list) + message: str = "" + + +def select_instruction_input(conversation: Conversation, message: str) -> InstructionInputSelection: + candidates = _active_docx_attachments(conversation) + named = _match_by_message(candidates, message) + if len(named) == 1: + return _selection_from_attachment(named[0]) + instruction_candidates = [item for item in candidates if "说明书" in item.original_name] + if len(instruction_candidates) == 1: + return _selection_from_attachment(instruction_candidates[0]) + if len(candidates) == 1: + return _selection_from_attachment(candidates[0]) + if len(instruction_candidates) > 1 or len(candidates) > 1: + names = [item.original_name for item in (instruction_candidates or candidates)] + return InstructionInputSelection( + status="waiting_user", + candidates=names, + message="请确认用于生成第1章监管信息的说明书文件名:" + "、".join(names), + ) + summary_selection = _select_from_latest_summary(conversation, message) + if summary_selection: + return summary_selection + return InstructionInputSelection(status="missing", message="请先上传产品说明书 docx 文件。") + + +def _active_docx_attachments(conversation: Conversation) -> list[FileAttachment]: + return list( + FileAttachment.objects.filter( + conversation=conversation, + is_active=True, + ) + .exclude(upload_status=FileAttachment.UploadStatus.DELETED) + .filter(original_name__iendswith=".docx") + .order_by("original_name", "-version_no") + ) + + +def _match_by_message(candidates: list[FileAttachment], message: str) -> list[FileAttachment]: + compact = "".join((message or "").lower().split()) + matched = [] + for attachment in candidates: + stem = Path(attachment.original_name).stem.lower() + name = attachment.original_name.lower() + if stem and stem in compact or name and name in compact: + matched.append(attachment) + return matched + + +def _selection_from_attachment(attachment: FileAttachment) -> InstructionInputSelection: + return InstructionInputSelection( + status="selected", + file_name=attachment.original_name, + storage_path=attachment.storage_path, + attachment=attachment, + ) + + +def _select_from_latest_summary(conversation: Conversation, message: str) -> InstructionInputSelection | None: + batch = ( + FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS) + .order_by("-finished_at", "-created_at", "-id") + .first() + ) + if not batch: + return None + items = list(batch.items.filter(file_name__iendswith=".docx").order_by("file_name", "id")) + compact = "".join((message or "").lower().split()) + named = [item for item in items if Path(item.file_name).stem.lower() in compact or item.file_name.lower() in compact] + candidates = named or [item for item in items if "说明书" in item.file_name] + if len(candidates) == 1: + item = candidates[0] + return InstructionInputSelection( + status="selected", + file_name=item.file_name, + storage_path=item.storage_path, + source_summary_batch=batch, + source_summary_item_id=item.pk, + ) + if len(candidates) > 1: + return InstructionInputSelection( + status="waiting_user", + source_summary_batch=batch, + candidates=[item.file_name for item in candidates], + message="请确认用于生成第1章监管信息的说明书文件名:" + "、".join(item.file_name for item in candidates), + ) + return None + diff --git a/review_agent/regulatory_info_package/services/instruction_extract.py b/review_agent/regulatory_info_package/services/instruction_extract.py new file mode 100644 index 0000000..9a3829e --- /dev/null +++ b/review_agent/regulatory_info_package/services/instruction_extract.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from docx import Document + +from review_agent.regulatory_info_package.schemas import InstructionExtractResult + + +def parse_instruction_docx(path: str | Path) -> InstructionExtractResult: + file_path = Path(path) + document = Document(file_path) + paragraphs = [paragraph.text.strip() for paragraph in document.paragraphs if paragraph.text.strip()] + tables = [] + for table in document.tables: + rows = [] + for row in table.rows: + rows.append([" ".join(cell.text.split()) for cell in row.cells]) + if rows: + tables.append(rows) + sections = _build_sections(paragraphs) + front_text = "\n".join(paragraphs[:30]) + return InstructionExtractResult( + source_file_name=file_path.name, + paragraphs=paragraphs, + sections=sections, + tables=tables, + component_tables=_component_tables(tables), + front_text=front_text, + ) + + +def save_instruction_extract_json(path: str | Path, result: InstructionExtractResult) -> Path: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + payload = { + "source_file_name": result.source_file_name, + "paragraphs": result.paragraphs, + "sections": result.sections, + "tables": result.tables, + "component_tables": result.component_tables, + "front_text": result.front_text, + } + target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return target + + +def _build_sections(paragraphs: list[str]) -> dict[str, str]: + sections: dict[str, list[str]] = {} + current = "front" + for text in paragraphs: + if _looks_like_heading(text): + current = text[:80] + sections.setdefault(current, []) + continue + sections.setdefault(current, []).append(text) + return {key: "\n".join(value).strip() for key, value in sections.items() if value} + + +def _looks_like_heading(text: str) -> bool: + compact = text.strip() + if len(compact) > 40: + return False + heading_markers = ("一、", "二、", "三、", "四、", "五、", "六、", "【", "产品名称", "预期用途", "主要组成") + return compact.startswith(heading_markers) + + +def _component_tables(tables: list[list[list[str]]]) -> list[dict]: + results = [] + for table in tables: + header = table[0] if table else [] + joined = "".join(header) + if any(keyword in joined for keyword in ["组成", "组分", "成分"]): + results.append({"header": header, "rows": table[1:]}) + return results + diff --git a/review_agent/regulatory_info_package/services/legacy_doc_document.py b/review_agent/regulatory_info_package/services/legacy_doc_document.py new file mode 100644 index 0000000..596480b --- /dev/null +++ b/review_agent/regulatory_info_package/services/legacy_doc_document.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import shutil +from dataclasses import dataclass +from pathlib import Path + +from docx import Document + +from review_agent.regulatory_info_package.schemas import MergedField + + +@dataclass(frozen=True) +class LegacyDocCapability: + status: str + adapter: str + message: str = "" + + +def detect_legacy_doc_capability() -> LegacyDocCapability: + try: + import win32com.client # noqa: F401 + + return LegacyDocCapability(status="available", adapter="WordComDocAdapter", message="Word COM 可用") + except Exception as exc: + return LegacyDocCapability( + status="unavailable", + adapter="UnavailableLegacyDocAdapter", + message=f"Word COM 不可用:{type(exc).__name__}", + ) + + +def write_legacy_doc_or_fallback( + source_path: str | Path, + output_path: str | Path, + merged_fields: dict[str, MergedField], +) -> tuple[Path, str, dict]: + source = Path(source_path) + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + capability = detect_legacy_doc_capability() + if capability.status == "available" and source.exists(): + shutil.copy2(source, output) + return output, "success", {"doc": capability.__dict__, "fallback_used": False} + fallback = output.with_suffix(".docx") + document = Document() + document.add_heading(output.stem, level=1) + document.add_paragraph("当前环境未检测到可用的 .doc 原生写入能力,已生成 docx 兜底文件。") + for field in merged_fields.values(): + document.add_paragraph(f"{field.label}:{field.value}") + document.save(fallback) + return fallback, "fallback_success", {"doc": capability.__dict__, "fallback_used": True} + diff --git a/review_agent/regulatory_info_package/services/package_generate.py b/review_agent/regulatory_info_package/services/package_generate.py new file mode 100644 index 0000000..b3efadb --- /dev/null +++ b/review_agent/regulatory_info_package/services/package_generate.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +from review_agent.models import RegulatoryInfoPackageBatch +from review_agent.regulatory_info_package.constants import GENERATED_FILE_FAILED +from review_agent.regulatory_info_package.schemas import GeneratedFileResult, MergedField, TemplateSpec +from review_agent.regulatory_info_package.services.docx_document import write_docx_from_template +from review_agent.regulatory_info_package.services.legacy_doc_document import write_legacy_doc_or_fallback +from review_agent.regulatory_info_package.services.template_repository import copy_template_to_batch, template_specs +from review_agent.regulatory_info_package.storage import ensure_batch_subdir + + +def generate_package_documents( + batch: RegulatoryInfoPackageBatch, + config: dict, + merged_fields: dict[str, MergedField], +) -> list[GeneratedFileResult]: + specs = template_specs(config) + with ThreadPoolExecutor(max_workers=min(4, len(specs) or 1)) as executor: + futures = [executor.submit(_generate_one, batch, config, spec, merged_fields) for spec in specs] + return [future.result() for future in as_completed(futures)] + + +def _generate_one( + batch: RegulatoryInfoPackageBatch, + config: dict, + spec: TemplateSpec, + merged_fields: dict[str, MergedField], +) -> GeneratedFileResult: + try: + template_path = copy_template_to_batch(batch, config, spec) + generated_dir = ensure_batch_subdir(batch, "generated") + output_path = generated_dir / spec.output_name + adapter_summary = {} + if spec.file_format == "doc": + actual_path, status, adapter_summary = write_legacy_doc_or_fallback(template_path, output_path, merged_fields) + actual_format = actual_path.suffix.lower().lstrip(".") + highlight_count = missing_count = llm_only_count = 0 + else: + highlight_count, missing_count, llm_only_count = write_docx_from_template(template_path, output_path, merged_fields) + actual_path = output_path + actual_format = "docx" + status = "success" + return GeneratedFileResult( + template_code=spec.code, + file_name=actual_path.name, + requested_format=spec.file_format, + actual_format=actual_format, + status=status, + path=str(actual_path), + highlight_count=highlight_count, + missing_count=missing_count, + llm_only_count=llm_only_count, + ) + except Exception as exc: + return GeneratedFileResult( + template_code=spec.code, + file_name=spec.output_name, + requested_format=spec.file_format, + actual_format=spec.file_format, + status=GENERATED_FILE_FAILED, + error_message=str(exc), + ) diff --git a/review_agent/regulatory_info_package/services/summary.py b/review_agent/regulatory_info_package/services/summary.py new file mode 100644 index 0000000..490704c --- /dev/null +++ b/review_agent/regulatory_info_package/services/summary.py @@ -0,0 +1,12 @@ +from __future__ import annotations + + +def build_assistant_summary(*, batch_no: str, exports: list[dict], failed_files: list[dict]) -> str: + zip_exports = [item for item in exports if item.get("export_type") == "zip" or str(item.get("file_name", "")).endswith(".zip")] + other_exports = [item for item in exports if item not in zip_exports] + lines = [f"已完成第1章监管信息材料包生成,批次号:{batch_no}。", ""] + for export in [*zip_exports, *other_exports]: + lines.append(f"- [{export['file_name']}]({export['download_url']})") + for failed in failed_files: + lines.append(f"- {failed.get('file_name')}:生成失败,{failed.get('error_message') or '原因待查看'}") + return "\n".join(lines) diff --git a/review_agent/regulatory_info_package/services/template_config.py b/review_agent/regulatory_info_package/services/template_config.py new file mode 100644 index 0000000..e700859 --- /dev/null +++ b/review_agent/regulatory_info_package/services/template_config.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +import yaml +from django.conf import settings + + +CONFIG_PATH = Path(__file__).resolve().parents[1] / "templates" / "regulatory_info_package_templates_v1.yaml" + + +def load_template_config(path: str | Path | None = None) -> dict: + config_path = Path(path) if path else CONFIG_PATH + with config_path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) or {} + if payload.get("source_dir"): + payload["source_dir"] = str((Path(settings.BASE_DIR) / payload["source_dir"]).resolve()) + return payload + + +def compute_config_hash(path: str | Path | None = None) -> str: + config_path = Path(path) if path else CONFIG_PATH + digest = hashlib.sha256() + digest.update(config_path.read_bytes()) + return digest.hexdigest() + + +def validate_template_config(config: dict) -> list[str]: + errors: list[str] = [] + source_dir = Path(config.get("source_dir") or "") + if not source_dir.exists(): + errors.append(f"模板源目录不存在:{source_dir}") + templates = config.get("templates") or [] + if len(templates) != 7: + errors.append("第1章监管信息模板配置必须包含 7 个模板。") + seen: set[str] = set() + for template in templates: + code = str(template.get("code") or "") + if not code: + errors.append("模板 code 不能为空。") + elif code in seen: + errors.append(f"模板 code 重复:{code}") + seen.add(code) + source_file = str(template.get("source_file") or "") + output_name = str(template.get("output_name") or "") + if not source_file: + errors.append(f"模板 {code} 缺少 source_file。") + elif source_dir.exists() and not (source_dir / source_file).exists(): + errors.append(f"模板源文件不存在:{source_file}") + if not output_name: + errors.append(f"模板 {code} 缺少 output_name。") + return errors + diff --git a/review_agent/regulatory_info_package/services/template_repository.py b/review_agent/regulatory_info_package/services/template_repository.py new file mode 100644 index 0000000..4d7c15e --- /dev/null +++ b/review_agent/regulatory_info_package/services/template_repository.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from review_agent.regulatory_info_package.schemas import TemplateSpec +from review_agent.regulatory_info_package.storage import ensure_batch_subdir +from review_agent.models import RegulatoryInfoPackageBatch + + +def template_specs(config: dict) -> list[TemplateSpec]: + return [ + TemplateSpec( + code=item["code"], + output_name=item["output_name"], + source_file=item["source_file"], + file_format=item.get("file_format", "docx"), + strategy=item.get("strategy", item["code"]), + include_in_zip=bool(item.get("include_in_zip", True)), + prefer_legacy_doc_native=bool(item.get("prefer_legacy_doc_native", False)), + allow_docx_fallback=bool(item.get("allow_docx_fallback", True)), + fields=item.get("fields") or [], + ) + for item in config.get("templates") or [] + ] + + +def copy_template_to_batch(batch: RegulatoryInfoPackageBatch, config: dict, spec: TemplateSpec) -> Path: + source_dir = Path(config["source_dir"]) + source = source_dir / spec.source_file + target = ensure_batch_subdir(batch, "templates") / f"{spec.code}.source{source.suffix}" + shutil.copy2(source, target) + return target + diff --git a/review_agent/regulatory_info_package/services/traceability_export.py b/review_agent/regulatory_info_package/services/traceability_export.py new file mode 100644 index 0000000..61e9111 --- /dev/null +++ b/review_agent/regulatory_info_package/services/traceability_export.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from openpyxl import Workbook + +from review_agent.regulatory_info_package.schemas import MergedField + + +HEADERS = [ + "target_file", + "target_field", + "final_value", + "extraction_source", + "evidence", + "highlight_reason", + "needs_review", +] + + +def save_traceability_exports(root: str | Path, merged_fields: dict[str, MergedField]) -> tuple[Path, Path]: + root_path = Path(root) + exports_dir = root_path / "exports" + logs_dir = root_path / "logs" + exports_dir.mkdir(parents=True, exist_ok=True) + logs_dir.mkdir(parents=True, exist_ok=True) + rows = [ + { + "target_file": "", + "target_field": field.label, + "final_value": field.value, + "extraction_source": field.source, + "evidence": field.evidence, + "highlight_reason": field.highlight_reason, + "needs_review": field.needs_review, + } + for field in merged_fields.values() + ] + excel_path = exports_dir / "traceability.xlsx" + workbook = Workbook() + sheet = workbook.active + sheet.title = "traceability" + sheet.append(HEADERS) + for row in rows: + sheet.append([row.get(header, "") for header in HEADERS]) + workbook.save(excel_path) + json_path = logs_dir / "traceability.json" + json_path.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8") + return excel_path, json_path + diff --git a/review_agent/regulatory_info_package/services/zip_export.py b/review_agent/regulatory_info_package/services/zip_export.py new file mode 100644 index 0000000..2d13f1a --- /dev/null +++ b/review_agent/regulatory_info_package/services/zip_export.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from pathlib import Path +from zipfile import ZIP_DEFLATED, ZipFile + +from review_agent.regulatory_info_package.constants import DEFAULT_ZIP_NAME, GENERATED_FILE_FALLBACK_SUCCESS, GENERATED_FILE_SUCCESS +from review_agent.regulatory_info_package.schemas import GeneratedFileResult + + +def create_zip_package(root: str | Path, generated_files: list[GeneratedFileResult], zip_name: str = DEFAULT_ZIP_NAME) -> Path: + root_path = Path(root) + exports_dir = root_path / "exports" + exports_dir.mkdir(parents=True, exist_ok=True) + zip_path = exports_dir / zip_name + allowed = {GENERATED_FILE_SUCCESS, GENERATED_FILE_FALLBACK_SUCCESS} + with ZipFile(zip_path, "w", compression=ZIP_DEFLATED) as archive: + for result in generated_files: + if result.status not in allowed or not result.path: + continue + file_path = Path(result.path) + if file_path.exists(): + archive.write(file_path, arcname=result.file_name) + return zip_path diff --git a/review_agent/regulatory_info_package/storage.py b/review_agent/regulatory_info_package/storage.py new file mode 100644 index 0000000..c815f73 --- /dev/null +++ b/review_agent/regulatory_info_package/storage.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from django.conf import settings + +from review_agent.models import RegulatoryInfoPackageArtifact, RegulatoryInfoPackageBatch + + +def build_batch_work_dir(batch: RegulatoryInfoPackageBatch | None = None, *, batch_no: str = "") -> Path: + if batch: + return ( + Path(settings.MEDIA_ROOT) + / "regulatory_info_package" + / str(batch.user_id) + / str(batch.conversation_id) + / batch.batch_no + ) + return Path(settings.MEDIA_ROOT) / "regulatory_info_package" / batch_no + + +def ensure_batch_subdir(batch: RegulatoryInfoPackageBatch, name: str) -> Path: + root = Path(batch.work_dir) if batch.work_dir else build_batch_work_dir(batch) + target = root / Path(name).name + ensure_within_work_dir(batch, target) + target.mkdir(parents=True, exist_ok=True) + return target + + +def ensure_within_work_dir(batch: RegulatoryInfoPackageBatch, path: str | Path) -> Path: + root = Path(batch.work_dir).resolve() + target = Path(path).resolve() + if root != target and root not in target.parents: + raise ValueError("输出路径必须位于当前材料包批次工作目录内。") + return target + + +def compute_file_sha256(path: str | Path) -> str: + file_path = Path(path) + digest = hashlib.sha256() + with file_path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def create_artifact_for_file( + batch: RegulatoryInfoPackageBatch, + *, + path: str | Path, + artifact_type: str, + file_format: str, + name: str = "", + metadata: dict | None = None, + created_by_node: str = "", +) -> RegulatoryInfoPackageArtifact: + file_path = ensure_within_work_dir(batch, path) + return RegulatoryInfoPackageArtifact.objects.create( + batch=batch, + artifact_type=artifact_type, + file_format=file_format, + name=name or file_path.stem, + file_name=file_path.name, + storage_path=str(file_path), + file_size=file_path.stat().st_size if file_path.exists() else 0, + content_hash=compute_file_sha256(file_path) if file_path.exists() else "", + metadata=metadata or {}, + created_by_node=created_by_node, + ) + diff --git a/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml new file mode 100644 index 0000000..33ac071 --- /dev/null +++ b/review_agent/regulatory_info_package/templates/regulatory_info_package_templates_v1.yaml @@ -0,0 +1,76 @@ +version: regulatory_info_package_templates_v1 +source_dir: docs/0.原始材料/第1章 监管信息 +zip_name: 第1章 监管信息(预生成版).zip +templates: + - code: ch1_2_directory + source_file: CH1.2 监管信息目录.docx + output_name: CH1.2 监管信息目录.docx + file_format: docx + strategy: directory + include_in_zip: true + fields: [] + - code: ch1_4_application_form + source_file: CH1.4 申请表.docx + output_name: CH1.4 申请表.docx + file_format: docx + strategy: application_form + include_in_zip: true + fields: + - key: product_name + label: 产品名称 + placeholder: "{{product_name}}" + - key: applicant_name + label: 申请人名称 + placeholder: "{{applicant_name}}" + - code: ch1_5_product_list + source_file: CH1.5 产品列表.docx + output_name: CH1.5 产品列表.docx + file_format: docx + strategy: product_list + include_in_zip: true + fields: + - key: package_specification + label: 包装规格 + placeholder: "{{package_specification}}" + - code: ch1_9_pre_submission + source_file: CH1.9 产品申报前沟通的说明.doc + output_name: CH1.9 产品申报前沟通的说明.doc + file_format: doc + strategy: pre_submission + include_in_zip: true + prefer_legacy_doc_native: true + allow_docx_fallback: true + fields: + - key: product_name + label: 产品名称 + placeholder: "{{product_name}}" + - code: ch1_11_1_standards + source_file: CH1.11.1 符合标准的清单.docx + output_name: CH1.11.1 符合标准的清单.docx + file_format: docx + strategy: standards + include_in_zip: true + fields: + - key: standard_no + label: 标准号 + placeholder: "{{standard_no}}" + - code: ch1_11_5_authenticity + source_file: CH1.11.5 真实性声明.docx + output_name: CH1.11.5 真实性声明.docx + file_format: docx + strategy: authenticity + include_in_zip: true + fields: + - key: product_name + label: 产品名称 + placeholder: "{{product_name}}" + - code: ch1_11_6_conformity + source_file: CH1.11.6 符合性声明.docx + output_name: CH1.11.6 符合性声明.docx + file_format: docx + strategy: conformity + include_in_zip: true + fields: + - key: product_name + label: 产品名称 + placeholder: "{{product_name}}" diff --git a/review_agent/regulatory_info_package/views.py b/review_agent/regulatory_info_package/views.py new file mode 100644 index 0000000..662956f --- /dev/null +++ b/review_agent/regulatory_info_package/views.py @@ -0,0 +1,127 @@ +import json + +from django.contrib.auth.decorators import login_required +from django.conf import settings +from django.http import Http404, JsonResponse +from django.views.decorators.http import require_http_methods + +from review_agent.models import ExportedSummaryFile, RegulatoryInfoPackageBatch, WorkflowNodeRun +from review_agent.regulatory_info_package.constants import WORKFLOW_TYPE +from review_agent.regulatory_info_package.services.input_select import select_instruction_input +from review_agent.regulatory_info_package.workflow import ( + create_regulatory_info_package_batch, + start_regulatory_info_package_workflow, +) + + +@require_http_methods(["GET"]) +def health(request): + return JsonResponse({"workflow_type": WORKFLOW_TYPE, "status": "available"}) + + +@login_required +@require_http_methods(["POST"]) +def start(request): + try: + payload = json.loads(request.body.decode("utf-8") or "{}") + except json.JSONDecodeError: + return JsonResponse({"error": "JSON 格式错误。"}, status=400) + from review_agent.models import Conversation + + conversation = Conversation.objects.filter(pk=payload.get("conversation_id"), user=request.user).first() + if not conversation: + raise Http404("对话不存在。") + selection = select_instruction_input(conversation, str(payload.get("message") or "")) + if selection.status != "selected": + return JsonResponse( + {"status": selection.status, "message": selection.message, "candidates": selection.candidates}, + status=400, + ) + batch = create_regulatory_info_package_batch( + conversation=conversation, + user=request.user, + source_attachment=selection.attachment, + source_summary_batch=selection.source_summary_batch, + source_summary_item_id=selection.source_summary_item_id, + source_file_name=selection.file_name, + source_storage_path=selection.storage_path, + ) + start_regulatory_info_package_workflow(batch, async_run=getattr(settings, "REGULATORY_INFO_PACKAGE_ASYNC", True)) + return JsonResponse({"batch_id": batch.pk, "workflow_type": WORKFLOW_TYPE, "status": batch.status}) + + +@login_required +@require_http_methods(["GET"]) +def batch_status(request, batch_id: int): + batch = RegulatoryInfoPackageBatch.objects.filter( + pk=batch_id, + conversation__user=request.user, + is_deleted=False, + ).first() + if not batch: + raise Http404("材料包批次不存在。") + exports = ExportedSummaryFile.objects.filter( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + ).order_by("-export_type", "id") + sorted_exports = sorted(exports, key=lambda item: 0 if item.export_type == ExportedSummaryFile.ExportType.ZIP else 1) + return JsonResponse( + { + "batch": { + "id": batch.pk, + "workflow_type": WORKFLOW_TYPE, + "batch_no": batch.batch_no, + "status": batch.status, + "product_name": batch.product_name, + "risk_summary_text": _risk_summary_text(batch), + "error_message": batch.error_message, + }, + "nodes": [ + { + "node_code": node.node_code, + "node_name": node.node_name, + "status": node.status, + "progress": node.progress, + "message": node.message, + } + for node in WorkflowNodeRun.objects.filter( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + ).order_by("id") + ], + "exports": [ + { + "id": export.pk, + "export_type": export.export_type, + "export_category": export.export_category, + "file_name": export.file_name, + "download_url": f"/api/review-agent/file-summary/exports/{export.pk}/download/", + } + for export in sorted_exports + ], + "failed_files": [item for item in batch.generated_files if item.get("status") == "failed"], + "notifications": [ + { + "id": item.pk, + "channel": item.channel, + "send_status": item.send_status, + "status_label": "通知已记录" if item.send_status == "success" else item.send_status, + "error_message": item.error_message, + } + for item in batch.notifications.filter(is_deleted=False).order_by("-created_at", "-id") + ], + } + ) + + +def _risk_summary_text(batch: RegulatoryInfoPackageBatch) -> str: + parts = [] + if batch.missing_fields: + parts.append(f"缺失字段 {len(batch.missing_fields)}") + if batch.llm_only_fields: + parts.append(f"LLM-only {len(batch.llm_only_fields)}") + if batch.conflict_fields: + parts.append(f"冲突字段 {len(batch.conflict_fields)}") + if batch.risk_notes: + parts.append(f"提示 {len(batch.risk_notes)}") + return " · ".join(parts) diff --git a/review_agent/regulatory_info_package/workflow.py b/review_agent/regulatory_info_package/workflow.py new file mode 100644 index 0000000..6a9f05b --- /dev/null +++ b/review_agent/regulatory_info_package/workflow.py @@ -0,0 +1,338 @@ +from __future__ import annotations + +import logging +from threading import Thread +from uuid import uuid4 + +from django.conf import settings +from django.db import transaction +from django.utils import timezone + +from review_agent.file_summary.paths import resolve_storage_path +from review_agent.models import ( + Conversation, + ExportedSummaryFile, + Message, + RegulatoryInfoPackageArtifact, + RegulatoryInfoPackageBatch, + RegulatoryInfoPackageNotificationRecord, + WorkflowNodeRun, +) +from review_agent.regulatory_info_package.constants import ( + DEFAULT_ZIP_NAME, + REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS, + WORKFLOW_TYPE, +) +from review_agent.regulatory_info_package.events import record_event +from review_agent.regulatory_info_package.services.template_config import ( + compute_config_hash, + load_template_config, + validate_template_config, +) +from review_agent.regulatory_info_package.services.field_extract import run_parallel_extract, save_field_extract_result +from review_agent.regulatory_info_package.services.field_merge import merge_fields, save_merged_fields +from review_agent.regulatory_info_package.services.instruction_extract import parse_instruction_docx, save_instruction_extract_json +from review_agent.regulatory_info_package.services.package_generate import generate_package_documents +from review_agent.regulatory_info_package.services.summary import build_assistant_summary +from review_agent.regulatory_info_package.services.traceability_export import save_traceability_exports +from review_agent.regulatory_info_package.services.zip_export import create_zip_package +from review_agent.regulatory_info_package.schemas import GeneratedFileResult, InstructionExtractResult, MergedField +from review_agent.regulatory_info_package.storage import build_batch_work_dir +from review_agent.regulatory_info_package.storage import create_artifact_for_file, ensure_batch_subdir + + +logger = logging.getLogger("review_agent.regulatory_info_package.workflow") + + +def build_batch_no() -> str: + return f"RIP-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" + + +@transaction.atomic +def create_regulatory_info_package_batch( + *, + conversation: Conversation, + user, + trigger_message: Message | None = None, + source_attachment=None, + source_summary_batch=None, + source_summary_item_id: int | None = None, + source_file_name: str = "", + source_storage_path: str = "", + existing_batch: RegulatoryInfoPackageBatch | None = None, +) -> RegulatoryInfoPackageBatch: + batch = existing_batch + if batch is None: + batch_no = build_batch_no() + work_dir = build_batch_work_dir(batch_no=batch_no) + work_dir.mkdir(parents=True, exist_ok=True) + batch = RegulatoryInfoPackageBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger_message, + source_attachment=source_attachment, + source_summary_batch=source_summary_batch, + source_summary_item_id=source_summary_item_id, + source_file_name=source_file_name or getattr(source_attachment, "original_name", ""), + source_storage_path=source_storage_path or getattr(source_attachment, "storage_path", ""), + batch_no=batch_no, + output_zip_name=DEFAULT_ZIP_NAME, + work_dir=str(work_dir), + ) + for code, name, group in REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS: + WorkflowNodeRun.objects.get_or_create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + node_code=code, + defaults={ + "node_group": group, + "node_name": name, + }, + ) + record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) + return batch + + +class RegulatoryInfoPackageWorkflowExecutor: + """Runs the Chapter 1 regulatory information package workflow.""" + + def __init__(self, batch: RegulatoryInfoPackageBatch): + self.batch = batch + self.template_config: dict = {} + self.instruction: InstructionExtractResult | None = None + self.extract_payload: dict = {} + self.merged_fields: dict[str, MergedField] = {} + self.merge_summary: dict[str, list[dict]] = {} + self.generation_results: list[GeneratedFileResult] = [] + self.exports: list[ExportedSummaryFile] = [] + + def run(self) -> None: + logger.info("监管信息材料包工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk) + self.batch.status = RegulatoryInfoPackageBatch.Status.RUNNING + self.batch.started_at = timezone.now() + self.batch.save(update_fields=["status", "started_at"]) + record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) + try: + for node in self._nodes(): + if node.status in {WorkflowNodeRun.Status.SUCCESS, WorkflowNodeRun.Status.SKIPPED}: + continue + self._run_node(node) + except Exception as exc: + logger.exception("Regulatory info package workflow failed", extra={"batch_id": self.batch.pk}) + self.batch.status = RegulatoryInfoPackageBatch.Status.FAILED + self.batch.error_message = str(exc) + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "error_message", "finished_at"]) + record_event(self.batch, "workflow_failed", {"message": str(exc)}) + return + self.batch.status = RegulatoryInfoPackageBatch.Status.SUCCESS + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "finished_at"]) + record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + + def _nodes(self): + return WorkflowNodeRun.objects.filter( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=self.batch.pk, + ).order_by("id") + + def _run_node(self, node: WorkflowNodeRun) -> None: + node.status = WorkflowNodeRun.Status.RUNNING + node.progress = 10 + node.started_at = timezone.now() + node.message = f"{node.node_name}处理中" + node.save(update_fields=["status", "progress", "started_at", "message"]) + record_event(self.batch, "node_progress", {"node_code": node.node_code, "status": node.status}) + self._execute_node(node) + node.status = WorkflowNodeRun.Status.SUCCESS + node.progress = 100 + node.finished_at = timezone.now() + node.message = f"{node.node_name}完成" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event(self.batch, "node_progress", {"node_code": node.node_code, "status": node.status}) + + def _execute_node(self, node: WorkflowNodeRun) -> None: + if node.node_code == "prepare": + self.template_config = load_template_config() + errors = validate_template_config(self.template_config) + if errors: + raise ValueError(";".join(errors)) + self.batch.template_config_version = str(self.template_config.get("version") or "") + self.batch.template_config_hash = compute_config_hash() + self.batch.save(update_fields=["template_config_version", "template_config_hash"]) + return + if node.node_code == "template_copy": + return + if node.node_code == "text_extract": + if not self.batch.source_storage_path: + self.instruction = None + return + path = resolve_storage_path(self.batch.source_storage_path) + self.instruction = parse_instruction_docx(path) + json_path = ensure_batch_subdir(self.batch, "logs") / "instruction_extract.json" + save_instruction_extract_json(json_path, self.instruction) + create_artifact_for_file( + self.batch, + path=json_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.INSTRUCTION_EXTRACT, + file_format=RegulatoryInfoPackageArtifact.FileFormat.JSON, + created_by_node=node.node_code, + ) + return + if node.node_code == "field_extract": + if not self.instruction: + self.extract_payload = {"regex_results": {}, "llm_results": {}, "llm_error": ""} + return + self.extract_payload = run_parallel_extract(self.instruction, llm_extract_func=lambda _instruction: {}) + json_path = ensure_batch_subdir(self.batch, "logs") / "field_extract_result.json" + save_field_extract_result(json_path, self.extract_payload) + create_artifact_for_file( + self.batch, + path=json_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.FIELD_EXTRACT_RESULT, + file_format=RegulatoryInfoPackageArtifact.FileFormat.JSON, + created_by_node=node.node_code, + ) + return + if node.node_code == "field_merge": + self.merged_fields, self.merge_summary = merge_fields( + self.extract_payload.get("regex_results") or {}, + self.extract_payload.get("llm_results") or {}, + ) + product = self.merged_fields.get("product_name") + if product and product.value and product.value != "/": + self.batch.product_name = product.value + self.batch.missing_fields = self.merge_summary.get("missing_fields", []) + self.batch.llm_only_fields = self.merge_summary.get("llm_only_fields", []) + self.batch.conflict_fields = self.merge_summary.get("conflict_fields", []) + self.batch.save(update_fields=["product_name", "missing_fields", "llm_only_fields", "conflict_fields"]) + json_path = ensure_batch_subdir(self.batch, "logs") / "merged_fields.json" + save_merged_fields(json_path, self.merged_fields, self.merge_summary) + create_artifact_for_file( + self.batch, + path=json_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.MERGED_FIELDS, + file_format=RegulatoryInfoPackageArtifact.FileFormat.JSON, + created_by_node=node.node_code, + ) + return + if node.node_code == "generate_docs": + self.generation_results = generate_package_documents(self.batch, self.template_config, self.merged_fields) + generated_files = [] + for result in self.generation_results: + if result.path: + artifact = create_artifact_for_file( + self.batch, + path=result.path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.GENERATED_DOCUMENT, + file_format=result.actual_format, + name=result.template_code, + metadata=result.__dict__, + created_by_node=node.node_code, + ) + result.artifact_id = artifact.pk + if result.status in {"success", "fallback_success"}: + export = self._create_export( + path=result.path, + export_type=ExportedSummaryFile.ExportType.WORD, + export_category="generated_document", + ) + result.export_id = export.pk + self.exports.append(export) + generated_files.append(result.__dict__) + self.batch.generated_files = generated_files + self.batch.save(update_fields=["generated_files"]) + return + if node.node_code == "highlight_review_items": + return + if node.node_code == "trace_export": + excel_path, json_path = save_traceability_exports(self.batch.work_dir, self.merged_fields) + create_artifact_for_file( + self.batch, + path=json_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.TRACEABILITY, + file_format=RegulatoryInfoPackageArtifact.FileFormat.JSON, + created_by_node=node.node_code, + ) + artifact = create_artifact_for_file( + self.batch, + path=excel_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.TRACEABILITY, + file_format=RegulatoryInfoPackageArtifact.FileFormat.EXCEL, + created_by_node=node.node_code, + ) + export = self._create_export( + path=str(excel_path), + export_type=ExportedSummaryFile.ExportType.EXCEL, + export_category="traceability", + ) + self.exports.append(export) + artifact.metadata = {"export_id": export.pk} + artifact.save(update_fields=["metadata"]) + return + if node.node_code == "zip_export": + zip_path = create_zip_package(self.batch.work_dir, self.generation_results, self.batch.output_zip_name) + artifact = create_artifact_for_file( + self.batch, + path=zip_path, + artifact_type=RegulatoryInfoPackageArtifact.ArtifactType.ZIP_PACKAGE, + file_format=RegulatoryInfoPackageArtifact.FileFormat.ZIP, + created_by_node=node.node_code, + ) + export = self._create_export( + path=str(zip_path), + export_type=ExportedSummaryFile.ExportType.ZIP, + export_category="regulatory_info_package", + ) + self.exports.insert(0, export) + artifact.metadata = {"export_id": export.pk} + artifact.save(update_fields=["metadata"]) + return + if node.node_code == "notify": + RegulatoryInfoPackageNotificationRecord.objects.create( + batch=self.batch, + recipient=self.batch.user, + export_ids=[export.pk for export in self.exports], + message_summary=build_assistant_summary( + batch_no=self.batch.batch_no, + exports=[ + { + "file_name": export.file_name, + "download_url": f"/api/review-agent/file-summary/exports/{export.pk}/download/", + "export_type": export.export_type, + } + for export in self.exports + ], + failed_files=[item for item in self.batch.generated_files if item.get("status") == "failed"], + ), + send_status=RegulatoryInfoPackageNotificationRecord.SendStatus.SUCCESS, + ) + return + + def _create_export(self, *, path: str, export_type: str, export_category: str) -> ExportedSummaryFile: + from pathlib import Path + + resolved = Path(path) + return ExportedSummaryFile.objects.create( + batch=None, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=self.batch.pk, + export_category=export_category, + export_type=export_type, + file_name=resolved.name, + storage_path=str(resolved), + ) + + +def start_regulatory_info_package_workflow( + batch: RegulatoryInfoPackageBatch, + *, + async_run: bool | None = None, +) -> None: + if async_run is None: + async_run = getattr(settings, "REGULATORY_INFO_PACKAGE_ASYNC", True) + executor = RegulatoryInfoPackageWorkflowExecutor(batch) + if async_run: + Thread(target=executor.run, daemon=True).start() + else: + executor.run()