diff --git a/review_agent/application_form_fill/__init__.py b/review_agent/application_form_fill/__init__.py new file mode 100644 index 0000000..3a7b8c0 --- /dev/null +++ b/review_agent/application_form_fill/__init__.py @@ -0,0 +1 @@ +"""Application form auto-fill workflow package.""" diff --git a/review_agent/application_form_fill/constants.py b/review_agent/application_form_fill/constants.py new file mode 100644 index 0000000..2fc91ba --- /dev/null +++ b/review_agent/application_form_fill/constants.py @@ -0,0 +1,31 @@ +WORKFLOW_TYPE = "application_form_fill" + +TEMPLATE_REGISTRATION_CERTIFICATE = "registration_certificate" +TEMPLATE_CHANGE_REGISTRATION = "change_registration" +TEMPLATE_ESSENTIAL_PRINCIPLES = "essential_principles" + +DEFAULT_OUTPUT_TYPES = ["word", "excel", "json"] + +FORM_FILL_TRIGGER_KEYWORDS = [ + "填注册证", + "对应的表格", + "生成申报模板", + "安全和性能基本原则清单", + "填到申报模板", + "自动填表", + "生成表格", +] + +FORM_FILL_NODE_DEFINITIONS = [ + ("prepare", "准备资料", "form_fill"), + ("template_select", "选择模板", "form_fill"), + ("template_copy", "复制模板", "form_fill"), + ("field_extract", "抽取字段", "form_fill"), + ("conflict_merge", "冲突归并", "form_fill"), + ("word_fill", "填写 Word", "form_fill"), + ("pdf_convert", "转换 PDF", "form_fill"), + ("trace_export", "追溯清单", "form_fill"), + ("output_export", "输出下载", "form_fill"), + ("notify", "飞书通知", "form_fill"), + ("completed", "完成", "completed"), +] diff --git a/review_agent/application_form_fill/events.py b/review_agent/application_form_fill/events.py new file mode 100644 index 0000000..be7ec28 --- /dev/null +++ b/review_agent/application_form_fill/events.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.models import ApplicationFormFillBatch, WorkflowEvent + + +def record_event( + batch: ApplicationFormFillBatch, + event_type: str, + payload: dict | None = None, +) -> WorkflowEvent: + return WorkflowEvent.objects.create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + conversation=batch.conversation, + event_type=event_type, + payload=payload or {}, + ) + + +def serialize_event(event: WorkflowEvent) -> dict[str, object]: + return { + "id": event.pk, + "event_type": event.event_type, + "payload": event.payload, + "created_at": event.created_at.isoformat(), + } diff --git a/review_agent/application_form_fill/prompts/field_extract.md b/review_agent/application_form_fill/prompts/field_extract.md new file mode 100644 index 0000000..6ff1461 --- /dev/null +++ b/review_agent/application_form_fill/prompts/field_extract.md @@ -0,0 +1,23 @@ +你是医疗器械体外诊断试剂申报资料字段抽取助手。 + +请只输出 JSON 对象,不要输出 Markdown。结构如下: + +{ + "fields": [ + { + "key": "product_name", + "label": "产品名称", + "value": "字段值", + "source_file": "来源文件名", + "source_role": "说明书", + "evidence": "原文证据", + "confidence": 0.8 + } + ], + "checklist_items": [] +} + +要求: +- 只抽取输入模板字段中出现的信息。 +- 字段值必须来自资料原文,不要编造。 +- 找不到时不要输出该字段。 diff --git a/review_agent/application_form_fill/schemas.py b/review_agent/application_form_fill/schemas.py new file mode 100644 index 0000000..de89257 --- /dev/null +++ b/review_agent/application_form_fill/schemas.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile, FileSummaryBatch, RegulatoryReviewBatch + + +@dataclass(frozen=True) +class TemplateSpec: + code: str + name: str + source_file: str + output_label: str + applies_when: dict[str, Any] + file_format: str + fields: list[dict[str, Any]] + checklist_items: list[dict[str, Any]] = field(default_factory=list) + + +@dataclass(frozen=True) +class ExtractedField: + key: str + label: str + value: str + source_file: str + source_role: str + evidence: str + extractor: str + confidence: float + + +@dataclass(frozen=True) +class MergedField: + key: str + label: str + value: str + source_file: str + evidence: str + confidence: float + has_conflict: bool = False + conflict_values: list[dict[str, Any]] = field(default_factory=list) + + +@dataclass +class FormFillContext: + batch: ApplicationFormFillBatch + source_summary_batch: FileSummaryBatch + source_regulatory_batch: RegulatoryReviewBatch | None + template_config: dict[str, Any] = field(default_factory=dict) + selected_templates: list[TemplateSpec] = field(default_factory=list) + document_texts: dict[str, str] = field(default_factory=dict) + regex_results: dict[str, Any] = field(default_factory=dict) + llm_results: dict[str, Any] = field(default_factory=dict) + merged_fields: dict[str, MergedField] = field(default_factory=dict) + checklist_items: dict[str, Any] = field(default_factory=dict) + conflicts: list[dict[str, Any]] = field(default_factory=list) + exports: list[ExportedSummaryFile] = field(default_factory=list) diff --git a/review_agent/application_form_fill/services/__init__.py b/review_agent/application_form_fill/services/__init__.py new file mode 100644 index 0000000..d92b991 --- /dev/null +++ b/review_agent/application_form_fill/services/__init__.py @@ -0,0 +1 @@ +"""Application form auto-fill services.""" diff --git a/review_agent/application_form_fill/services/field_extract.py b/review_agent/application_form_fill/services/field_extract.py new file mode 100644 index 0000000..4c72f10 --- /dev/null +++ b/review_agent/application_form_fill/services/field_extract.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import json +import re +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any + +from django.conf import settings + +from review_agent.application_form_fill.schemas import ExtractedField, TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.llm import generate_completion +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, FileSummaryBatch +from review_agent.regulatory_review.services.text_extract import extract_text + + +def collect_document_texts(summary_batch: FileSummaryBatch) -> dict[str, str]: + texts: dict[str, str] = {} + for item in summary_batch.items.order_by("file_index"): + path = Path(item.storage_path) + if not path.is_absolute(): + path = Path(settings.MEDIA_ROOT) / item.storage_path + if not path.exists(): + continue + result = extract_text(path) + if result.status == "success" and result.text: + texts[item.file_name] = result.text + return texts + + +def extract_by_rules(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]: + fields: list[dict[str, Any]] = [] + field_defs = _field_defs(specs) + labels = [field["label"] for field in field_defs if field.get("label")] + for file_name, text in texts.items(): + source_role = detect_source_role(file_name, text) + for field in field_defs: + value, evidence = _extract_label_value(text, field["label"], labels) + if not value: + continue + fields.append( + ExtractedField( + key=field["key"], + label=field["label"], + value=value, + source_file=file_name, + source_role=source_role, + evidence=evidence, + extractor="rule", + confidence=0.75 if source_role == "说明书" else 0.65, + ).__dict__ + ) + return {"fields": fields, "checklist_items": []} + + +def extract_by_llm(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]: + try: + raw = generate_completion( + [ + {"role": "system", "content": _prompt_text()}, + {"role": "user", "content": _build_llm_user_prompt(texts, specs)}, + ], + temperature=0.0, + ) + payload = _parse_json_object(raw) + except Exception as exc: + return {"fields": [], "checklist_items": [], "error_message": str(exc)} + + fields = [] + allowed_keys = {field["key"] for field in _field_defs(specs)} + for item in payload.get("fields") or []: + if not isinstance(item, dict) or item.get("key") not in allowed_keys or not item.get("value"): + continue + fields.append( + { + "key": str(item.get("key") or ""), + "label": str(item.get("label") or item.get("key") or ""), + "value": str(item.get("value") or "").strip(), + "source_file": str(item.get("source_file") or ""), + "source_role": str(item.get("source_role") or detect_source_role(str(item.get("source_file") or ""), "")), + "evidence": str(item.get("evidence") or "").strip(), + "extractor": "llm", + "confidence": _float_confidence(item.get("confidence"), default=0.7), + } + ) + return {"fields": fields, "checklist_items": payload.get("checklist_items") or []} + + +def run_parallel_extract(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]: + with ThreadPoolExecutor(max_workers=2) as executor: + rule_future = executor.submit(extract_by_rules, texts, specs) + llm_future = executor.submit(extract_by_llm, texts, specs) + regex_results = rule_future.result() + llm_results = llm_future.result() + return { + "regex_results": regex_results, + "llm_results": llm_results, + "selected_templates": [spec.code for spec in specs], + "source_evidence": [{"source_file": name, "char_count": len(text)} for name, text in texts.items()], + } + + +def save_field_extract_result(batch: ApplicationFormFillBatch, payload: dict[str, Any]) -> ApplicationFormFillArtifact: + target_dir = ensure_batch_subdir(batch, "exports") + path = target_dir / "field_extract_result.json" + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return create_artifact_for_file( + batch, + path=path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FIELD_EXTRACT_RESULT, + file_format=ApplicationFormFillArtifact.FileFormat.JSON, + name="field_extract_result", + metadata={"artifact": "field_extract_result"}, + created_by_node="field_extract", + ) + + +def detect_source_role(file_name: str, text: str = "") -> str: + target = f"{file_name}\n{text[:200]}" + if "说明书" in target: + return "说明书" + if "产品技术要求" in target: + return "产品技术要求" + if "注册检验" in target or "检测报告" in target: + return "注册检验报告" + if "性能研究" in target: + return "性能研究资料" + if "申请表" in target: + return "申请表" + return "其他注册资料" + + +def _field_defs(specs: list[TemplateSpec]) -> list[dict[str, str]]: + fields: list[dict[str, str]] = [] + for spec in specs: + for field in spec.fields: + key = str(field.get("key") or "") + label = str(field.get("label") or "") + if key and label: + fields.append({"key": key, "label": label}) + return fields + + +def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]: + escaped_labels = "|".join(re.escape(item) for item in labels if item != label) + stop_pattern = rf"(?=\n\s*(?:{escaped_labels})\s*[::])" if escaped_labels else r"(?=\Z)" + pattern = re.compile(rf"{re.escape(label)}\s*[::]\s*(.+?)(?:{stop_pattern}|\Z)", re.S) + match = pattern.search(text or "") + if not match: + return "", "" + raw = match.group(1).strip() + value = re.sub(r"\n{2,}.*\Z", "", raw, flags=re.S).strip() + value = "\n".join(line.strip() for line in value.splitlines() if line.strip()) + evidence = f"{label}:{value}"[:300] + return value, evidence + + +def _prompt_text() -> str: + path = Path(__file__).resolve().parents[1] / "prompts" / "field_extract.md" + return path.read_text(encoding="utf-8") + + +def _build_llm_user_prompt(texts: dict[str, str], specs: list[TemplateSpec]) -> str: + fields = [{"key": field["key"], "label": field["label"]} for field in _field_defs(specs)] + documents = [{"source_file": name, "text": text[:4000]} for name, text in texts.items()] + return json.dumps({"fields": fields, "documents": documents}, ensure_ascii=False) + + +def _parse_json_object(raw: str) -> dict[str, Any]: + text = (raw or "").strip() + if text.startswith("```"): + text = text.strip("`").strip() + if text.lower().startswith("json"): + text = text[4:].strip() + start = text.find("{") + end = text.rfind("}") + if start == -1 or end == -1 or end < start: + raise json.JSONDecodeError("未找到 JSON 对象", text, 0) + return json.loads(text[start : end + 1]) + + +def _float_confidence(value, *, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default diff --git a/review_agent/application_form_fill/services/field_merge.py b/review_agent/application_form_fill/services/field_merge.py new file mode 100644 index 0000000..b6c858a --- /dev/null +++ b/review_agent/application_form_fill/services/field_merge.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import re +from typing import Any + +from review_agent.application_form_fill.schemas import MergedField + + +SOURCE_PRIORITY = { + "说明书": 1, + "产品技术要求": 2, + "注册检验报告": 3, + "检测报告": 3, + "性能研究资料": 4, + "其他注册资料": 5, +} + + +def normalize_field_value(value: str) -> str: + return re.sub(r"\s+", "", str(value or "")).strip().lower() + + +def rank_source(source_role: str, source_file: str = "") -> int: + target = f"{source_role}\n{source_file}" + for keyword, rank in SOURCE_PRIORITY.items(): + if keyword in target: + return rank + return 9 + + +def merge_fields(regex_results: dict[str, Any], llm_results: dict[str, Any]) -> tuple[dict[str, MergedField], list[dict]]: + grouped: dict[str, list[dict[str, Any]]] = {} + for item in list(regex_results.get("fields") or []) + list(llm_results.get("fields") or []): + key = str(item.get("key") or "") + value = str(item.get("value") or "").strip() + if not key or not value: + continue + grouped.setdefault(key, []).append(item) + + merged: dict[str, MergedField] = {} + conflicts: list[dict] = [] + for key, candidates in grouped.items(): + selected = sorted( + candidates, + key=lambda item: ( + rank_source(str(item.get("source_role") or ""), str(item.get("source_file") or "")), + -float(item.get("confidence") or 0), + ), + )[0] + distinct = _distinct_values(candidates) + has_conflict = len(distinct) > 1 + conflict_values = [ + { + "value": item.get("value"), + "source_file": item.get("source_file", ""), + "source_role": item.get("source_role", ""), + "evidence": item.get("evidence", ""), + } + for item in candidates + if normalize_field_value(str(item.get("value") or "")) != normalize_field_value(str(selected.get("value") or "")) + ] + merged_field = MergedField( + key=key, + label=str(selected.get("label") or key), + value=str(selected.get("value") or ""), + source_file=str(selected.get("source_file") or ""), + evidence=str(selected.get("evidence") or ""), + confidence=float(selected.get("confidence") or 0), + has_conflict=has_conflict, + conflict_values=conflict_values, + ) + merged[key] = merged_field + if has_conflict: + conflicts.append( + { + "field_key": key, + "field_label": merged_field.label, + "selected_value": merged_field.value, + "selected_source": merged_field.source_file, + "conflict_values": conflict_values, + "handling": "说明书优先,模板内黄底红字高亮" if rank_source(merged_field.source_file, merged_field.source_file) == 1 else "按来源优先级采用最高优先级字段", + } + ) + return merged, conflicts + + +def _distinct_values(candidates: list[dict[str, Any]]) -> set[str]: + return {normalize_field_value(str(item.get("value") or "")) for item in candidates if item.get("value")} diff --git a/review_agent/application_form_fill/services/notifier.py b/review_agent/application_form_fill/services/notifier.py new file mode 100644 index 0000000..c3c2969 --- /dev/null +++ b/review_agent/application_form_fill/services/notifier.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from django.utils import timezone + +from review_agent.models import ( + ApplicationFormFillBatch, + ApplicationFormFillNotificationRecord, + ExportedSummaryFile, +) + + +def notify_completion( + batch: ApplicationFormFillBatch, + exports: list[ExportedSummaryFile], + *, + fail: bool = False, +) -> ApplicationFormFillNotificationRecord: + export_ids = [export.pk for export in exports] + message_summary = ( + f"自动填表批次 {batch.batch_no} 已完成," + f"模板 {', '.join(batch.selected_templates or []) or '未识别'}," + f"冲突字段 {len(batch.conflict_summary or [])} 个。" + ) + if fail: + return ApplicationFormFillNotificationRecord.objects.create( + batch=batch, + recipient=batch.user, + channel=ApplicationFormFillNotificationRecord.Channel.MOCK, + template_codes=batch.selected_templates, + export_ids=export_ids, + message_summary=message_summary, + send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED, + retry_count=1, + error_message="mock notification failed", + ) + return ApplicationFormFillNotificationRecord.objects.create( + batch=batch, + recipient=batch.user, + channel=ApplicationFormFillNotificationRecord.Channel.MOCK, + template_codes=batch.selected_templates, + export_ids=export_ids, + message_summary=message_summary, + send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS, + sent_at=timezone.now(), + ) diff --git a/review_agent/application_form_fill/services/summary.py b/review_agent/application_form_fill/services/summary.py new file mode 100644 index 0000000..7501d7b --- /dev/null +++ b/review_agent/application_form_fill/services/summary.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile + + +def build_assistant_summary(batch: ApplicationFormFillBatch, exports: list[ExportedSummaryFile]) -> str: + word_exports = [export for export in exports if export.export_type == ExportedSummaryFile.ExportType.WORD] + trace_exports = [ + export + for export in exports + if export.export_type in {ExportedSummaryFile.ExportType.EXCEL, ExportedSummaryFile.ExportType.JSON} + ] + lines = ["已生成申报模板自动填表文件。", "", "| 文件 | Word | PDF |", "| --- | --- | --- |"] + if word_exports: + for export in word_exports: + lines.append(f"| {export.file_name} | [下载](/api/review-agent/file-summary/exports/{export.pk}/download/) | 待增强 |") + else: + lines.append("| 自动填表结果 | 未生成 | 待增强 |") + + conflicts = batch.conflict_summary or [] + if conflicts: + lines.extend(["", "| 冲突字段 | 采用值 | 冲突来源 | 处理 |", "| --- | --- | --- | --- |"]) + for item in conflicts: + conflict_sources = ";".join( + f"{value.get('source_file', '')}:{value.get('value', '')}" for value in item.get("conflict_values", []) + ) + lines.append( + f"| {item.get('field_label', item.get('field_key', ''))} | {item.get('selected_value', '')} | {conflict_sources or '-'} | {item.get('handling', '')} |" + ) + + if trace_exports: + lines.append("") + for export in trace_exports: + lines.append(f"[下载{export.file_name}](/api/review-agent/file-summary/exports/{export.pk}/download/)") + return "\n".join(lines).strip() diff --git a/review_agent/application_form_fill/services/template_config.py b/review_agent/application_form_fill/services/template_config.py new file mode 100644 index 0000000..b2538b1 --- /dev/null +++ b/review_agent/application_form_fill/services/template_config.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path +from typing import Any + +import yaml +from django.conf import settings + + +DEFAULT_CONFIG_PATH = ( + Path(settings.BASE_DIR) + / "review_agent" + / "application_form_fill" + / "templates" + / "application_form_templates_v1.yaml" +) + +SUPPORTED_TARGET_TYPES = {"table_row", "placeholder"} +SUPPORTED_FILE_FORMATS = {"doc", "docx"} + + +def load_template_config(path: str | Path | None = None) -> dict[str, Any]: + config_path = Path(path) if path else DEFAULT_CONFIG_PATH + with config_path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) or {} + return payload + + +def compute_config_hash(path: str | Path | None = None) -> str: + config_path = Path(path) if path else DEFAULT_CONFIG_PATH + digest = hashlib.sha256() + with config_path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def validate_template_config(config: dict[str, Any], *, base_dir: str | Path | None = None) -> list[str]: + errors: list[str] = [] + root = Path(base_dir) if base_dir else Path(settings.BASE_DIR) + + version = config.get("version") + if not version: + errors.append("模板配置缺少 version。") + + source_dir_value = config.get("source_dir") + source_dir = root / source_dir_value if source_dir_value else None + if not source_dir_value: + errors.append("模板配置缺少 source_dir。") + elif not source_dir.exists(): + errors.append(f"模板 source_dir 不存在:{source_dir_value}") + + templates = config.get("templates") + if not isinstance(templates, list) or not templates: + errors.append("模板配置必须包含非空 templates 列表。") + return errors + + seen_codes: set[str] = set() + for index, template in enumerate(templates, start=1): + if not isinstance(template, dict): + errors.append(f"第 {index} 个模板配置必须是对象。") + continue + code = str(template.get("code") or "").strip() + if not code: + errors.append(f"第 {index} 个模板缺少 code。") + elif code in seen_codes: + errors.append(f"模板 code 重复:{code}") + seen_codes.add(code) + + file_format = str(template.get("file_format") or "").strip().lower() + if file_format not in SUPPORTED_FILE_FORMATS: + errors.append(f"模板 {code or index} 的 file_format 不支持:{file_format or '空'}") + + source_file = str(template.get("source_file") or "").strip() + if not source_file: + errors.append(f"模板 {code or index} 缺少 source_file。") + elif source_dir and source_dir.exists() and not (source_dir / source_file).exists(): + errors.append(f"模板 {code or index} 的 source_file 不存在:{source_file}") + + fields = template.get("fields") or [] + if not isinstance(fields, list): + errors.append(f"模板 {code or index} 的 fields 必须是列表。") + continue + for field_index, field in enumerate(fields, start=1): + target = field.get("target") if isinstance(field, dict) else None + target_type = str((target or {}).get("type") or "").strip() + if target_type not in SUPPORTED_TARGET_TYPES: + errors.append( + f"模板 {code or index} 第 {field_index} 个字段 target.type 不支持:{target_type or '空'}" + ) + return errors + + +def template_specs(config: dict[str, Any]) -> list[dict[str, Any]]: + return list(config.get("templates") or []) diff --git a/review_agent/application_form_fill/services/template_repository.py b/review_agent/application_form_fill/services/template_repository.py new file mode 100644 index 0000000..0b9f691 --- /dev/null +++ b/review_agent/application_form_fill/services/template_repository.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from typing import Any + +from django.conf import settings + +from review_agent.application_form_fill.schemas import TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch + + +class TemplateUnavailableError(Exception): + pass + + +def resolve_source_template(spec: TemplateSpec, config: dict[str, Any]) -> Path: + source_dir = Path(settings.BASE_DIR) / str(config.get("source_dir") or "") + working_template = getattr(spec, "working_template", "") or "" + if spec.file_format == "doc" and working_template: + candidate = source_dir / working_template + else: + candidate = source_dir / spec.source_file + if not candidate.exists(): + raise TemplateUnavailableError(f"模板文件不存在:{spec.source_file}") + if spec.file_format == "doc" and candidate.suffix.lower() == ".doc": + raise TemplateUnavailableError(f"模板 {spec.code} 为 .doc,当前阶段需预转换为 .docx 后使用。") + return candidate + + +def copy_template_to_batch( + spec: TemplateSpec, + batch: ApplicationFormFillBatch, + config: dict[str, Any], +) -> ApplicationFormFillArtifact: + source = resolve_source_template(spec, config) + target_dir = ensure_batch_subdir(batch, "templates") + target = target_dir / f"{spec.code}.source{source.suffix.lower()}" + shutil.copy2(source, target) + _ensure_under(target, Path(batch.work_dir)) + return create_artifact_for_file( + batch, + path=target, + artifact_type=ApplicationFormFillArtifact.ArtifactType.TEMPLATE_COPY, + file_format=source.suffix.lower().lstrip(".") or spec.file_format, + name=spec.name, + metadata={"template_code": spec.code, "source_file": spec.source_file}, + created_by_node="template_copy", + ) + + +def _ensure_under(path: Path, root: Path) -> None: + resolved_path = path.resolve() + resolved_root = root.resolve() + if resolved_path != resolved_root and resolved_root not in resolved_path.parents: + raise ValueError(f"模板复制目标不在批次工作目录内:{path}") diff --git a/review_agent/application_form_fill/services/template_select.py b/review_agent/application_form_fill/services/template_select.py new file mode 100644 index 0000000..11c770d --- /dev/null +++ b/review_agent/application_form_fill/services/template_select.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +from typing import Any + +from review_agent.application_form_fill.constants import ( + TEMPLATE_CHANGE_REGISTRATION, + TEMPLATE_ESSENTIAL_PRINCIPLES, + TEMPLATE_REGISTRATION_CERTIFICATE, +) +from review_agent.application_form_fill.schemas import TemplateSpec +from review_agent.models import ApplicationFormFillBatch + + +ALL_TEMPLATE_CODES = [ + TEMPLATE_REGISTRATION_CERTIFICATE, + TEMPLATE_CHANGE_REGISTRATION, + TEMPLATE_ESSENTIAL_PRINCIPLES, +] + + +def parse_requested_templates(message: str) -> list[str]: + normalized = (message or "").lower() + if any(keyword in normalized for keyword in ["全部模板", "所有模板", "全套模板", "全部表格", "所有表格"]): + return ALL_TEMPLATE_CODES.copy() + + requested: list[str] = [] + if "注册证" in normalized and "变更注册" not in normalized and "变更 注册" not in normalized: + requested.append(TEMPLATE_REGISTRATION_CERTIFICATE) + if any(keyword in normalized for keyword in ["变更注册", "变更 注册", "变更备案", "备案文件"]): + requested.append(TEMPLATE_CHANGE_REGISTRATION) + if any(keyword in normalized for keyword in ["安全和性能基本原则", "基本原则清单", "原则清单"]): + requested.append(TEMPLATE_ESSENTIAL_PRINCIPLES) + return _dedupe(requested) + + +def detect_registration_type( + *, + batch: ApplicationFormFillBatch | None = None, + message: str = "", + file_candidates: dict[str, Any] | None = None, +) -> tuple[str, str]: + user_value = _registration_type_from_text(message) + if user_value: + return user_value, ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE + + regulatory_value = _registration_type_from_regulatory_batch(batch) + if regulatory_value: + return regulatory_value, ApplicationFormFillBatch.RegistrationTypeSource.REGULATORY_BATCH + + file_value = _registration_type_from_candidates(file_candidates or {}) + if file_value: + return file_value, ApplicationFormFillBatch.RegistrationTypeSource.FILE_EXTRACT + + return "unknown", ApplicationFormFillBatch.RegistrationTypeSource.UNKNOWN + + +def select_templates( + config: dict[str, Any], + requested_templates: list[str], + registration_type: str, +) -> tuple[list[TemplateSpec], list[dict[str, str]]]: + template_map = {item.get("code"): item for item in config.get("templates") or []} + risk_notes: list[dict[str, str]] = [] + if requested_templates: + selected_codes = _dedupe(requested_templates) + elif registration_type in {"变更注册", "备案"}: + selected_codes = [TEMPLATE_CHANGE_REGISTRATION, TEMPLATE_ESSENTIAL_PRINCIPLES] + else: + selected_codes = [TEMPLATE_REGISTRATION_CERTIFICATE, TEMPLATE_ESSENTIAL_PRINCIPLES] + + specs: list[TemplateSpec] = [] + for code in selected_codes: + raw = template_map.get(code) + if not raw: + risk_notes.append({"type": "unknown_template", "message": f"模板不存在:{code}"}) + continue + spec = _to_template_spec(raw) + if requested_templates and not _template_applies(spec, registration_type): + risk_notes.append( + { + "type": "template_registration_mismatch", + "message": f"用户指定模板 {spec.name} 与注册类型 {registration_type or 'unknown'} 可能不匹配,仍按指定生成。", + } + ) + specs.append(spec) + return specs, risk_notes + + +def _to_template_spec(raw: dict[str, Any]) -> TemplateSpec: + return TemplateSpec( + code=str(raw.get("code") or ""), + name=str(raw.get("name") or ""), + source_file=str(raw.get("source_file") or ""), + output_label=str(raw.get("output_label") or raw.get("name") or ""), + applies_when=dict(raw.get("applies_when") or {}), + file_format=str(raw.get("file_format") or ""), + fields=list(raw.get("fields") or []), + checklist_items=list(raw.get("checklist_items") or []), + ) + + +def _template_applies(spec: TemplateSpec, registration_type: str) -> bool: + allowed = spec.applies_when.get("registration_type") or [] + if not allowed: + return True + return registration_type in allowed or (registration_type == "unknown" and "unknown" in allowed) + + +def _registration_type_from_text(message: str) -> str: + normalized = (message or "").lower() + if any(keyword in normalized for keyword in ["首次注册", "初次注册", "新注册"]): + return "首次注册" + if "变更注册" in normalized: + return "变更注册" + if "备案" in normalized: + return "备案" + return "" + + +def _registration_type_from_regulatory_batch(batch: ApplicationFormFillBatch | None) -> str: + if not batch or not batch.source_regulatory_batch_id: + return "" + condition_json = batch.source_regulatory_batch.condition_json or {} + confirmed = condition_json.get("confirmed_conditions") or {} + candidates = condition_json.get("candidates") or {} + for payload in [confirmed, condition_json, candidates.get("registration_type") or {}]: + if isinstance(payload, dict): + value = payload.get("registration_type") or payload.get("suggested") or payload.get("value") + normalized = _normalize_registration_type(value) + if normalized: + return normalized + return "" + + +def _registration_type_from_candidates(candidates: dict[str, Any]) -> str: + value = candidates.get("registration_type") or candidates.get("suggested") + if isinstance(value, dict): + value = value.get("value") or value.get("suggested") + return _normalize_registration_type(value) + + +def _normalize_registration_type(value: Any) -> str: + text = str(value or "") + if "首次" in text or "初次" in text: + return "首次注册" + if "变更" in text: + return "变更注册" + if "备案" in text: + return "备案" + return "" + + +def _dedupe(values: list[str]) -> list[str]: + result: list[str] = [] + for value in values: + if value and value not in result: + result.append(value) + return result diff --git a/review_agent/application_form_fill/services/traceability_export.py b/review_agent/application_form_fill/services/traceability_export.py new file mode 100644 index 0000000..4be7934 --- /dev/null +++ b/review_agent/application_form_fill/services/traceability_export.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import json +from dataclasses import asdict +from pathlib import Path +from typing import Any + +from openpyxl import Workbook + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile + + +def build_traceability_workbook( + batch: ApplicationFormFillBatch, + merged_fields: dict[str, MergedField], + conflicts: list[dict[str, Any]], + specs: list[TemplateSpec], + generation_results: list[dict[str, Any]] | None = None, +) -> Workbook: + workbook = Workbook() + field_sheet = workbook.active + field_sheet.title = "字段追溯" + field_sheet.append(["模板", "字段", "填入值", "来源文件", "证据", "冲突状态"]) + template_names = {field.get("key"): spec.output_label for spec in specs for field in spec.fields} + for key, field in merged_fields.items(): + field_sheet.append( + [ + template_names.get(key, ""), + field.label, + field.value, + field.source_file, + field.evidence, + "冲突" if field.has_conflict else "一致", + ] + ) + + conflict_sheet = workbook.create_sheet("冲突字段") + conflict_sheet.append(["字段", "采用值", "冲突值", "冲突来源", "处理方式"]) + for conflict in conflicts: + conflict_values = conflict.get("conflict_values") or [] + if not conflict_values: + conflict_sheet.append( + [ + conflict.get("field_label", ""), + conflict.get("selected_value", ""), + "", + "", + conflict.get("handling", ""), + ] + ) + continue + for value in conflict_values: + conflict_sheet.append( + [ + conflict.get("field_label", ""), + conflict.get("selected_value", ""), + value.get("value", ""), + value.get("source_file", ""), + conflict.get("handling", ""), + ] + ) + + low_confidence_sheet = workbook.create_sheet("低置信度条目") + low_confidence_sheet.append(["字段", "填入值", "置信度", "来源文件"]) + for field in merged_fields.values(): + if field.confidence < 0.6: + low_confidence_sheet.append([field.label, field.value, field.confidence, field.source_file]) + + result_sheet = workbook.create_sheet("生成结果") + result_sheet.append(["模板", "Word状态", "PDF状态", "错误说明"]) + for result in generation_results or []: + result_sheet.append( + [ + result.get("template_label", ""), + result.get("word_status", ""), + result.get("pdf_status", "待增强"), + result.get("error_message", ""), + ] + ) + if not generation_results: + for spec in specs: + result_sheet.append([spec.output_label, "待生成", "待增强", ""]) + return workbook + + +def save_traceability_exports( + batch: ApplicationFormFillBatch, + merged_fields: dict[str, MergedField], + conflicts: list[dict[str, Any]], + specs: list[TemplateSpec], + generation_results: list[dict[str, Any]] | None = None, +) -> list[ExportedSummaryFile]: + target_dir = ensure_batch_subdir(batch, "exports") + workbook = build_traceability_workbook(batch, merged_fields, conflicts, specs, generation_results) + excel_path = target_dir / f"{batch.batch_no}-字段来源追溯清单.xlsx" + workbook.save(excel_path) + create_artifact_for_file( + batch, + path=excel_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY, + file_format=ApplicationFormFillArtifact.FileFormat.EXCEL, + name="字段来源追溯清单", + metadata={"conflict_count": len(conflicts)}, + created_by_node="trace_export", + ) + excel_export = ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="traceability", + export_type=ExportedSummaryFile.ExportType.EXCEL, + file_name=excel_path.name, + storage_path=str(excel_path), + ) + + json_path = target_dir / "merged_fields.json" + payload = { + "batch_no": batch.batch_no, + "merged_fields": {key: asdict(value) for key, value in merged_fields.items()}, + "conflicts": conflicts, + "generation_results": generation_results or [], + } + json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + create_artifact_for_file( + batch, + path=json_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.MERGED_FIELDS, + file_format=ApplicationFormFillArtifact.FileFormat.JSON, + name="merged_fields", + metadata={"conflict_count": len(conflicts)}, + created_by_node="trace_export", + ) + json_export = ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="traceability", + export_type=ExportedSummaryFile.ExportType.JSON, + file_name=json_path.name, + storage_path=str(json_path), + ) + return [excel_export, json_export] diff --git a/review_agent/application_form_fill/services/word_fill.py b/review_agent/application_form_fill/services/word_fill.py new file mode 100644 index 0000000..801f56a --- /dev/null +++ b/review_agent/application_form_fill/services/word_fill.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import re +from pathlib import Path + +from docx import Document +from docx.oxml import OxmlElement +from docx.oxml.ns import qn +from docx.shared import RGBColor + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile + + +def fill_template( + template_path: str | Path, + output_path: str | Path, + spec: TemplateSpec, + fields: dict[str, MergedField], + conflicts: list[dict] | None = None, +) -> Path: + document = Document(str(template_path)) + conflict_keys = {item.get("field_key") for item in conflicts or []} + for field_config in spec.fields: + target = field_config.get("target") or {} + if target.get("type") != "table_row": + continue + key = field_config.get("key") + field = fields.get(key) + if not field: + continue + fill_table_row( + document, + str(target.get("row_label") or field_config.get("label") or ""), + field.value, + conflict=key in conflict_keys or field.has_conflict, + ) + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + document.save(str(output)) + return output + + +def fill_table_row(document: Document, row_label: str, value: str, *, conflict: bool = False) -> bool: + normalized_label = _normalize_label(row_label) + for table in document.tables: + for row in table.rows: + if len(row.cells) < 2: + continue + if _normalize_label(row.cells[0].text) != normalized_label: + continue + target = row.cells[1] + target.text = "" + paragraph = target.paragraphs[0] + run = paragraph.add_run(value) + if conflict: + run.font.color.rgb = RGBColor(0xFF, 0x00, 0x00) + apply_cell_shading(target, "FFFF00") + return True + return False + + +def apply_cell_shading(cell, fill: str) -> None: + tc_pr = cell._tc.get_or_add_tcPr() + shading = tc_pr.find(qn("w:shd")) + if shading is None: + shading = OxmlElement("w:shd") + tc_pr.append(shading) + shading.set(qn("w:fill"), fill) + + +def create_word_export( + batch: ApplicationFormFillBatch, + spec: TemplateSpec, + template_path: str | Path, + fields: dict[str, MergedField], + conflicts: list[dict] | None = None, +) -> ExportedSummaryFile: + target_dir = ensure_batch_subdir(batch, "filled") + product_name = _safe_filename(batch.product_name or fields.get("product_name", MergedField("product_name", "产品名称", "", "", "", 0)).value or "未识别产品") + output_path = target_dir / f"{batch.batch_no}-{product_name}-{_safe_filename(spec.output_label)}.docx" + fill_template(template_path, output_path, spec, fields, conflicts) + create_artifact_for_file( + batch, + path=output_path, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE, + file_format=ApplicationFormFillArtifact.FileFormat.DOCX, + name=spec.output_label, + metadata={"template_code": spec.code, "conflict_count": len(conflicts or [])}, + created_by_node="word_fill", + ) + return ExportedSummaryFile.objects.create( + batch=batch.source_summary_batch, + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name=output_path.name, + storage_path=str(output_path), + ) + + +def _normalize_label(value: str) -> str: + return re.sub(r"\s+", "", value or "").replace(":", "").replace(":", "") + + +def _safe_filename(value: str) -> str: + text = re.sub(r'[\\/:*?"<>|]+', "_", value or "") + return text.strip()[:80] or "output" diff --git a/review_agent/application_form_fill/storage.py b/review_agent/application_form_fill/storage.py new file mode 100644 index 0000000..eeba562 --- /dev/null +++ b/review_agent/application_form_fill/storage.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from django.conf import settings + +from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch + + +def build_batch_work_dir(batch: ApplicationFormFillBatch | None = None, *, batch_no: str = "") -> Path: + if batch: + return Path(settings.MEDIA_ROOT) / "application_form_fill" / str(batch.user_id) / str(batch.conversation_id) / batch.batch_no + return Path(settings.MEDIA_ROOT) / "application_form_fill" / batch_no + + +def compute_file_sha256(path: str | Path) -> str: + file_path = Path(path) + digest = hashlib.sha256() + with file_path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def ensure_batch_subdir(batch: ApplicationFormFillBatch, name: str) -> Path: + root = Path(batch.work_dir) if batch.work_dir else build_batch_work_dir(batch) + target = root / Path(name).name + target.mkdir(parents=True, exist_ok=True) + return target + + +def create_artifact_for_file( + batch: ApplicationFormFillBatch, + *, + path: str | Path, + artifact_type: str, + file_format: str, + name: str = "", + metadata: dict | None = None, + created_by_node: str = "", +) -> ApplicationFormFillArtifact: + file_path = Path(path) + return ApplicationFormFillArtifact.objects.create( + batch=batch, + artifact_type=artifact_type, + file_format=file_format, + name=name or file_path.stem, + file_name=file_path.name, + storage_path=str(file_path), + file_size=file_path.stat().st_size if file_path.exists() else 0, + content_hash=compute_file_sha256(file_path) if file_path.exists() else "", + metadata=metadata or {}, + created_by_node=created_by_node, + ) diff --git a/review_agent/application_form_fill/templates/application_form_templates_v1.yaml b/review_agent/application_form_fill/templates/application_form_templates_v1.yaml new file mode 100644 index 0000000..9b106d7 --- /dev/null +++ b/review_agent/application_form_fill/templates/application_form_templates_v1.yaml @@ -0,0 +1,112 @@ +version: application_form_templates_v1 +source_dir: docs/0.原始材料/关于公布体外诊断试剂注册申报资料要求和批准证明文件格式的公告 +templates: + - code: registration_certificate + name: 中华人民共和国医疗器械注册证(体外诊断试剂)(格式) + source_file: 中华人民共和国医疗器械注册证(体外诊断试剂)(格式).docx + output_label: 注册证格式 + applies_when: + registration_type: + - 首次注册 + - unknown + file_format: docx + fields: + - key: applicant_name + label: 注册人名称 + target: + type: table_row + row_label: 注册人名称 + source_roles: + - 申请表 + - 说明书 + - 企业信息 + - key: applicant_address + label: 注册人住所 + target: + type: table_row + row_label: 注册人住所 + source_roles: + - 申请表 + - 企业信息 + - key: manufacturer_address + label: 生产地址 + target: + type: table_row + row_label: 生产地址 + source_roles: + - 申请表 + - 质量管理体系文件 + - key: product_name + label: 产品名称 + target: + type: table_row + row_label: 产品名称 + source_roles: + - 说明书 + - 产品技术要求 + - 注册检验报告 + - key: package_specification + label: 包装规格 + target: + type: table_row + row_label: 包装规格 + source_roles: + - 说明书 + - 产品技术要求 + - key: main_components + label: 主要组成成分 + target: + type: table_row + row_label: 主要组成成分 + source_roles: + - 说明书 + - 产品技术要求 + - key: intended_use + label: 预期用途 + target: + type: table_row + row_label: 预期用途 + source_roles: + - 说明书 + - 临床评价资料 + - 产品技术要求 + - key: storage_condition_and_validity + label: 产品储存条件及有效期 + target: + type: table_row + row_label: 产品储存条件及有效期 + source_roles: + - 说明书 + - 产品技术要求 + - 稳定性研究资料 + - key: attachments + label: 附件 + target: + type: table_row + row_label: 附件 + source_roles: + - 注册申报资料 + - 说明书 + - code: change_registration + name: 中华人民共和国医疗器械变更注册(备案)文件(体外诊断试剂)(格式) + source_file: 中华人民共和国医疗器械变更注册(备案)文件(体外诊断试剂)(格式).doc + output_label: 变更注册备案文件 + applies_when: + registration_type: + - 变更注册 + - 备案 + file_format: doc + fields: [] + - code: essential_principles + name: 体外诊断试剂安全和性能基本原则清单 + source_file: 体外诊断试剂安全和性能基本原则清单.doc + output_label: 安全和性能基本原则清单 + applies_when: + registration_type: + - 首次注册 + - 变更注册 + - 备案 + - unknown + file_format: doc + fields: [] + checklist_items: [] diff --git a/review_agent/application_form_fill/views.py b/review_agent/application_form_fill/views.py new file mode 100644 index 0000000..fb147b4 --- /dev/null +++ b/review_agent/application_form_fill/views.py @@ -0,0 +1,127 @@ +import json + +from django.contrib.auth.decorators import login_required +from django.conf import settings +from django.http import Http404, JsonResponse +from django.views.decorators.http import require_http_methods + +from review_agent.application_form_fill.workflow import ( + create_application_form_fill_batch, + find_latest_successful_summary_batch, + start_application_form_fill_workflow, +) +from review_agent.models import ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileSummaryBatch, WorkflowNodeRun + + +@require_http_methods(["GET"]) +def health(request): + return JsonResponse({"workflow_type": "application_form_fill", "status": "available"}) + + +@login_required +@require_http_methods(["POST"]) +def start(request): + try: + payload = json.loads(request.body.decode("utf-8") or "{}") + except json.JSONDecodeError: + return JsonResponse({"error": "JSON 格式错误。"}, status=400) + + conversation = Conversation.objects.filter(pk=payload.get("conversation_id"), user=request.user).first() + if not conversation: + raise Http404("对话不存在。") + + summary_batch = None + if payload.get("file_summary_batch_id"): + summary_batch = FileSummaryBatch.objects.filter( + pk=payload.get("file_summary_batch_id"), + conversation=conversation, + user=request.user, + status=FileSummaryBatch.Status.SUCCESS, + ).first() + if summary_batch is None: + summary_batch = find_latest_successful_summary_batch(conversation) + if summary_batch is None: + return JsonResponse({"error": "请先上传资料并完成文件汇总。"}, status=400) + + batch = create_application_form_fill_batch( + conversation=conversation, + user=request.user, + source_summary_batch=summary_batch, + requested_templates=payload.get("template_codes") or [], + output_types=payload.get("output_types") or None, + ) + start_application_form_fill_workflow(batch, async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True)) + return JsonResponse( + { + "batch_id": batch.pk, + "workflow_type": "application_form_fill", + "status": batch.status, + "selected_templates": batch.selected_templates, + } + ) + + +@login_required +@require_http_methods(["GET"]) +def batch_status(request, batch_id: int): + batch = ApplicationFormFillBatch.objects.filter( + pk=batch_id, + conversation__user=request.user, + is_deleted=False, + ).first() + if not batch: + raise Http404("填表批次不存在。") + exports = ExportedSummaryFile.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).order_by("id") + return JsonResponse( + { + "batch": { + "id": batch.pk, + "workflow_type": "application_form_fill", + "batch_no": batch.batch_no, + "status": batch.status, + "product_name": batch.product_name, + "selected_templates": batch.selected_templates, + "conflict_count": len(batch.conflict_summary or []), + "risk_summary_text": _risk_summary_text(batch), + "error_message": batch.error_message, + }, + "nodes": [ + { + "node_code": node.node_code, + "node_name": node.node_name, + "status": node.status, + "progress": node.progress, + "message": node.message, + } + for node in WorkflowNodeRun.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).order_by("id") + ], + "conflicts": batch.conflict_summary or [], + "exports": [ + { + "id": export.pk, + "export_type": export.export_type, + "export_category": export.export_category, + "file_name": export.file_name, + "download_url": f"/api/review-agent/file-summary/exports/{export.pk}/download/", + } + for export in exports + ], + } + ) + + +def _risk_summary_text(batch: ApplicationFormFillBatch) -> str: + parts = [] + if batch.selected_templates: + parts.append("模板 " + "、".join(batch.selected_templates)) + if batch.conflict_summary: + parts.append(f"冲突字段 {len(batch.conflict_summary)}") + if batch.risk_notes: + parts.append(f"提示 {len(batch.risk_notes)}") + return " · ".join(parts) diff --git a/review_agent/application_form_fill/workflow.py b/review_agent/application_form_fill/workflow.py new file mode 100644 index 0000000..57699d3 --- /dev/null +++ b/review_agent/application_form_fill/workflow.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +import logging +from threading import Thread +from uuid import uuid4 + +from django.conf import settings +from django.db import transaction +from django.utils import timezone + +from review_agent.application_form_fill.constants import DEFAULT_OUTPUT_TYPES, FORM_FILL_NODE_DEFINITIONS, WORKFLOW_TYPE +from review_agent.application_form_fill.events import record_event +from review_agent.application_form_fill.services.field_extract import ( + collect_document_texts, + run_parallel_extract, + save_field_extract_result, +) +from review_agent.application_form_fill.services.field_merge import merge_fields +from review_agent.application_form_fill.services.notifier import notify_completion +from review_agent.application_form_fill.services.summary import build_assistant_summary +from review_agent.application_form_fill.services.template_config import ( + compute_config_hash, + load_template_config, + validate_template_config, +) +from review_agent.application_form_fill.services.template_repository import ( + TemplateUnavailableError, + copy_template_to_batch, +) +from review_agent.application_form_fill.services.template_select import ( + detect_registration_type, + parse_requested_templates, + select_templates, +) +from review_agent.application_form_fill.services.traceability_export import save_traceability_exports +from review_agent.application_form_fill.services.word_fill import create_word_export +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.storage import build_batch_work_dir +from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, Message, WorkflowNodeRun + + +logger = logging.getLogger("review_agent.application_form_fill.workflow") + + +def build_batch_no() -> str: + return f"AFF-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" + + +def find_latest_successful_summary_batch(conversation: Conversation) -> FileSummaryBatch | None: + return ( + FileSummaryBatch.objects.filter( + conversation=conversation, + status=FileSummaryBatch.Status.SUCCESS, + ) + .order_by("-finished_at", "-created_at", "-id") + .first() + ) + + +@transaction.atomic +def create_application_form_fill_batch( + *, + conversation: Conversation, + user, + source_summary_batch: FileSummaryBatch, + trigger_message: Message | None = None, + requested_templates: list[str] | None = None, + output_types: list[str] | None = None, +) -> ApplicationFormFillBatch: + batch_no = build_batch_no() + work_dir = build_batch_work_dir(batch_no=batch_no) + work_dir.mkdir(parents=True, exist_ok=True) + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger_message, + source_summary_batch=source_summary_batch, + batch_no=batch_no, + requested_templates=requested_templates or [], + output_types=output_types or DEFAULT_OUTPUT_TYPES, + work_dir=str(work_dir), + ) + for code, name, group in FORM_FILL_NODE_DEFINITIONS: + WorkflowNodeRun.objects.create( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + node_group=group, + node_code=code, + node_name=name, + ) + record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) + return batch + + +class FormFillWorkflowExecutor: + """Runs the auto-fill workflow skeleton; later stages fill node bodies.""" + + def __init__(self, batch: ApplicationFormFillBatch): + self.batch = batch + self.template_config: dict = {} + self.selected_templates: list[TemplateSpec] = [] + self.template_paths: dict[str, str] = {} + self.document_texts: dict[str, str] = {} + self.extract_payload: dict = {} + self.merged_fields: dict[str, MergedField] = {} + self.conflicts: list[dict] = [] + self.exports = [] + self.generation_results: list[dict] = [] + self.non_blocking_errors: list[str] = [] + + def run(self) -> None: + logger.info("自动填表工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk) + self.batch.status = ApplicationFormFillBatch.Status.RUNNING + self.batch.started_at = timezone.now() + self.batch.save(update_fields=["status", "started_at"]) + record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) + + try: + for node in self._nodes(): + if node.status in {WorkflowNodeRun.Status.SUCCESS, WorkflowNodeRun.Status.SKIPPED}: + continue + self._run_node(node) + except Exception as exc: + logger.exception("Application form fill workflow failed", extra={"batch_id": self.batch.pk}) + self.batch.status = ApplicationFormFillBatch.Status.FAILED + self.batch.error_message = str(exc) + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "error_message", "finished_at"]) + record_event(self.batch, "workflow_failed", {"message": str(exc)}) + return + + self.batch.refresh_from_db() + if self.batch.status != ApplicationFormFillBatch.Status.PARTIAL_SUCCESS: + self.batch.status = ApplicationFormFillBatch.Status.SUCCESS + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "finished_at"]) + record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + logger.info("自动填表工作流完成 batch_no=%s", self.batch.batch_no) + + def _nodes(self): + return WorkflowNodeRun.objects.filter( + workflow_type=WORKFLOW_TYPE, + workflow_batch_id=self.batch.pk, + ).order_by("id") + + def _run_node(self, node: WorkflowNodeRun) -> None: + node.status = WorkflowNodeRun.Status.RUNNING + node.progress = 10 + node.started_at = timezone.now() + node.message = f"{node.node_name}处理中" + node.save(update_fields=["status", "progress", "started_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + + if node.node_code == "pdf_convert": + self._append_risk_note( + { + "type": "pdf_pending", + "message": "PDF 转换为后续增强项,本次优先生成 Word。", + } + ) + node.status = WorkflowNodeRun.Status.SKIPPED + node.progress = 100 + node.finished_at = timezone.now() + node.message = "PDF 转换为后续增强项,本次跳过" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + return + + self._execute_node(node) + + node.status = WorkflowNodeRun.Status.SUCCESS + node.progress = 100 + node.finished_at = timezone.now() + node.message = f"{node.node_name}完成" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + + def _execute_node(self, node: WorkflowNodeRun) -> None: + if node.node_code == "prepare": + if self.batch.source_summary_batch.status != FileSummaryBatch.Status.SUCCESS: + raise ValueError("自动填表需要成功的文件汇总批次。") + return + if node.node_code == "template_select": + self.template_config = load_template_config() + errors = validate_template_config(self.template_config) + if errors: + raise ValueError(";".join(errors)) + requested = parse_requested_templates(self.batch.trigger_message.content if self.batch.trigger_message else "") + registration_type, source = detect_registration_type(batch=self.batch, message=self.batch.trigger_message.content if self.batch.trigger_message else "") + specs, risk_notes = select_templates(self.template_config, requested, registration_type) + if not specs: + raise ValueError("未选择到可用申报模板。") + self.selected_templates = specs + self.batch.requested_templates = requested + self.batch.selected_templates = [spec.code for spec in specs] + self.batch.registration_type = registration_type + self.batch.registration_type_source = source + self.batch.template_config_version = str(self.template_config.get("version") or "") + self.batch.template_config_hash = compute_config_hash() + self.batch.risk_notes = list(self.batch.risk_notes or []) + risk_notes + self.batch.save( + update_fields=[ + "requested_templates", + "selected_templates", + "registration_type", + "registration_type_source", + "template_config_version", + "template_config_hash", + "risk_notes", + ] + ) + return + if node.node_code == "template_copy": + for spec in self.selected_templates: + try: + artifact = copy_template_to_batch(spec, self.batch, self.template_config) + self.template_paths[spec.code] = artifact.storage_path + except TemplateUnavailableError as exc: + self.non_blocking_errors.append(str(exc)) + self._append_risk_note({"type": "template_unavailable", "message": str(exc), "template_code": spec.code}) + if not self.template_paths: + raise ValueError("没有可用的 Word 模板副本。") + return + if node.node_code == "field_extract": + self.document_texts = collect_document_texts(self.batch.source_summary_batch) + self.extract_payload = run_parallel_extract(self.document_texts, self.selected_templates) + save_field_extract_result(self.batch, self.extract_payload) + return + if node.node_code == "conflict_merge": + self.merged_fields, self.conflicts = merge_fields( + self.extract_payload.get("regex_results") or {}, + self.extract_payload.get("llm_results") or {}, + ) + product = self.merged_fields.get("product_name") + if product and product.value: + self.batch.product_name = product.value + self.batch.conflict_summary = self.conflicts + self.batch.save(update_fields=["product_name", "conflict_summary"]) + return + if node.node_code == "word_fill": + for spec in self.selected_templates: + template_path = self.template_paths.get(spec.code) + if not template_path: + self.generation_results.append( + { + "template_code": spec.code, + "template_label": spec.output_label, + "word_status": "failed", + "pdf_status": "待增强", + "error_message": "模板不可用", + } + ) + continue + export = create_word_export(self.batch, spec, template_path, self.merged_fields, self.conflicts) + self.exports.append(export) + self.generation_results.append( + { + "template_code": spec.code, + "template_label": spec.output_label, + "word_status": "success", + "pdf_status": "待增强", + "error_message": "", + } + ) + if not any(item["word_status"] == "success" for item in self.generation_results): + raise ValueError("所有目标 Word 模板均生成失败。") + return + if node.node_code == "trace_export": + self.exports.extend( + save_traceability_exports( + self.batch, + self.merged_fields, + self.conflicts, + self.selected_templates, + self.generation_results, + ) + ) + return + if node.node_code == "output_export": + Message.objects.create( + conversation=self.batch.conversation, + role=Message.Role.ASSISTANT, + content=build_assistant_summary(self.batch, self.exports), + ) + return + if node.node_code == "notify": + notification = notify_completion( + self.batch, + self.exports, + fail=getattr(settings, "APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL", False), + ) + if notification.send_status == notification.SendStatus.FAILED: + self.non_blocking_errors.append(notification.error_message or "通知失败") + return + if node.node_code == "completed": + self._mark_final_status() + + def _mark_final_status(self) -> None: + failed_word = any(item.get("word_status") == "failed" for item in self.generation_results) + if self.non_blocking_errors or failed_word: + self.batch.status = ApplicationFormFillBatch.Status.PARTIAL_SUCCESS + else: + self.batch.status = ApplicationFormFillBatch.Status.SUCCESS + self.batch.save(update_fields=["status"]) + + def _append_risk_note(self, note: dict) -> None: + self.batch.risk_notes = list(self.batch.risk_notes or []) + [note] + self.batch.save(update_fields=["risk_notes"]) + + +def start_application_form_fill_workflow(batch: ApplicationFormFillBatch, *, async_run: bool = True) -> None: + executor = FormFillWorkflowExecutor(batch) + if not async_run: + executor.run() + return + Thread(target=executor.run, daemon=True).start() diff --git a/review_agent/file_summary/views.py b/review_agent/file_summary/views.py index 680d4a3..860c13d 100644 --- a/review_agent/file_summary/views.py +++ b/review_agent/file_summary/views.py @@ -7,7 +7,7 @@ from pathlib import Path from django.http import FileResponse, Http404, JsonResponse from django.views.decorators.http import require_http_methods -from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment, Message +from review_agent.models import ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileAttachment, Message from review_agent.models import FileSummaryBatch, WorkflowEvent from .events import serialize_event from .paths import resolve_storage_path @@ -271,10 +271,7 @@ def batch_events(request, batch_id: int): @require_http_methods(["GET"]) @login_required def export_download(request, export_id: int): - exported = ExportedSummaryFile.objects.filter( - pk=export_id, - batch__user=request.user, - ).first() + exported = _export_for_user(request.user, export_id) if not exported: raise Http404("导出文件不存在。") path = Path(exported.storage_path) @@ -288,6 +285,8 @@ def export_download(request, export_id: int): ExportedSummaryFile.ExportType.MARKDOWN: "text/markdown; charset=utf-8", ExportedSummaryFile.ExportType.EXCEL: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ExportedSummaryFile.ExportType.JSON: "application/json; charset=utf-8", + ExportedSummaryFile.ExportType.WORD: "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ExportedSummaryFile.ExportType.PDF: "application/pdf", } content_type = content_types.get(exported.export_type, "application/octet-stream") logger.info( @@ -305,3 +304,21 @@ def export_download(request, export_id: int): filename=exported.file_name, content_type=content_type, ) + + +def _export_for_user(user, export_id: int) -> ExportedSummaryFile | None: + exported = ExportedSummaryFile.objects.filter(pk=export_id).first() + if not exported: + return None + if exported.workflow_type == "application_form_fill": + if not exported.workflow_batch_id: + return None + allowed = ApplicationFormFillBatch.objects.filter( + pk=exported.workflow_batch_id, + conversation__user=user, + is_deleted=False, + ).exists() + return exported if allowed else None + if exported.batch.user_id != user.pk: + return None + return exported diff --git a/review_agent/migrations/0006_alter_exportedsummaryfile_export_type_and_more.py b/review_agent/migrations/0006_alter_exportedsummaryfile_export_type_and_more.py new file mode 100644 index 0000000..b7821f1 --- /dev/null +++ b/review_agent/migrations/0006_alter_exportedsummaryfile_export_type_and_more.py @@ -0,0 +1,353 @@ +# Generated by Django 5.2.14 on 2026-06-07 10:19 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("review_agent", "0005_alter_regulatoryissue_status"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AlterField( + model_name="exportedsummaryfile", + name="export_type", + field=models.CharField( + choices=[ + ("markdown", "Markdown"), + ("excel", "Excel"), + ("json", "JSON"), + ("word", "Word"), + ("pdf", "PDF"), + ], + max_length=20, + ), + ), + migrations.CreateModel( + name="ApplicationFormFillBatch", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("batch_no", models.CharField(max_length=64, unique=True)), + ( + "status", + models.CharField( + choices=[ + ("pending", "待执行"), + ("running", "执行中"), + ("waiting_user", "等待用户"), + ("success", "成功"), + ("partial_success", "部分成功"), + ("failed", "失败"), + ("cancelled", "已取消"), + ], + default="pending", + max_length=30, + ), + ), + ("requested_templates", models.JSONField(blank=True, default=list)), + ("selected_templates", models.JSONField(blank=True, default=list)), + ("output_types", models.JSONField(blank=True, default=list)), + ( + "registration_type", + models.CharField(blank=True, default="", max_length=80), + ), + ( + "registration_type_source", + models.CharField( + choices=[ + ("user_message", "用户话语"), + ("regulatory_batch", "法规核查批次"), + ("file_extract", "文件抽取"), + ("unknown", "未知"), + ], + default="unknown", + max_length=40, + ), + ), + ( + "product_name", + models.CharField(blank=True, default="", max_length=200), + ), + ("conflict_summary", models.JSONField(blank=True, default=list)), + ("risk_notes", models.JSONField(blank=True, default=list)), + ( + "template_config_version", + models.CharField(blank=True, default="", max_length=80), + ), + ( + "template_config_hash", + models.CharField(blank=True, default="", max_length=128), + ), + ("work_dir", models.CharField(blank=True, default="", max_length=500)), + ("error_message", models.TextField(blank=True, default="")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("started_at", models.DateTimeField(blank=True, null=True)), + ("finished_at", models.DateTimeField(blank=True, null=True)), + ("archived_at", models.DateTimeField(blank=True, null=True)), + ("is_deleted", models.BooleanField(default=False)), + ( + "conversation", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="application_form_fill_batches", + to="review_agent.conversation", + ), + ), + ( + "source_regulatory_batch", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="application_form_fill_batches", + to="review_agent.regulatoryreviewbatch", + ), + ), + ( + "source_summary_batch", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="application_form_fill_batches", + to="review_agent.filesummarybatch", + ), + ), + ( + "trigger_message", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="triggered_application_form_fill_batches", + to="review_agent.message", + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="review_application_form_fill_batches", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "db_table": "ra_application_form_fill_batch", + "ordering": ["-created_at", "-id"], + }, + ), + migrations.CreateModel( + name="ApplicationFormFillArtifact", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "artifact_type", + models.CharField( + choices=[ + ("template_copy", "模板副本"), + ("field_extract_result", "字段抽取结果"), + ("merged_fields", "字段合并结果"), + ("traceability", "追溯清单"), + ("filled_template", "已填模板"), + ("notification_record", "通知记录"), + ], + max_length=60, + ), + ), + ( + "file_format", + models.CharField( + choices=[ + ("json", "JSON"), + ("excel", "Excel"), + ("docx", "DOCX"), + ("pdf", "PDF"), + ("markdown", "Markdown"), + ], + max_length=20, + ), + ), + ("name", models.CharField(max_length=160)), + ("file_name", models.CharField(max_length=255)), + ("storage_path", models.CharField(max_length=500)), + ("file_size", models.BigIntegerField(default=0)), + ( + "content_hash", + models.CharField(blank=True, default="", max_length=128), + ), + ("metadata", models.JSONField(blank=True, default=dict)), + ( + "created_by_node", + models.CharField(blank=True, default="", max_length=60), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("is_deleted", models.BooleanField(default=False)), + ( + "batch", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="artifacts", + to="review_agent.applicationformfillbatch", + ), + ), + ], + options={ + "db_table": "ra_application_form_fill_artifact", + "ordering": ["-created_at", "-id"], + }, + ), + migrations.CreateModel( + name="ApplicationFormFillNotificationRecord", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "channel", + models.CharField( + choices=[ + ("feishu_cli", "飞书 CLI"), + ("feishu_api", "飞书 API"), + ("mock", "模拟"), + ], + default="mock", + max_length=30, + ), + ), + ("template_codes", models.JSONField(blank=True, default=list)), + ("export_ids", models.JSONField(blank=True, default=list)), + ("message_summary", models.TextField(blank=True, default="")), + ( + "send_status", + models.CharField( + choices=[ + ("pending", "待发送"), + ("success", "成功"), + ("failed", "失败"), + ], + default="pending", + max_length=20, + ), + ), + ("retry_count", models.PositiveIntegerField(default=0)), + ( + "external_message_id", + models.CharField(blank=True, default="", max_length=120), + ), + ("error_message", models.TextField(blank=True, default="")), + ("sent_at", models.DateTimeField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("is_deleted", models.BooleanField(default=False)), + ( + "batch", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="notifications", + to="review_agent.applicationformfillbatch", + ), + ), + ( + "recipient", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="application_form_fill_notifications", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "db_table": "ra_application_form_fill_notification_record", + "ordering": ["-created_at", "-id"], + }, + ), + migrations.AddIndex( + model_name="applicationformfillbatch", + index=models.Index( + fields=["conversation", "status"], name="idx_ra_aff_batch_conv_status" + ), + ), + migrations.AddIndex( + model_name="applicationformfillbatch", + index=models.Index( + fields=["source_summary_batch"], name="idx_ra_aff_batch_summary" + ), + ), + migrations.AddIndex( + model_name="applicationformfillbatch", + index=models.Index( + fields=["source_regulatory_batch"], name="idx_ra_aff_batch_regulatory" + ), + ), + migrations.AddIndex( + model_name="applicationformfillbatch", + index=models.Index( + fields=["user", "created_at"], name="idx_ra_aff_batch_user_created" + ), + ), + migrations.AddIndex( + model_name="applicationformfillbatch", + index=models.Index(fields=["created_at"], name="idx_ra_aff_batch_created"), + ), + migrations.AddIndex( + model_name="applicationformfillartifact", + index=models.Index( + fields=["batch", "artifact_type"], name="idx_ra_aff_artifact_batch_type" + ), + ), + migrations.AddIndex( + model_name="applicationformfillartifact", + index=models.Index( + fields=["file_format"], name="idx_ra_aff_artifact_format" + ), + ), + migrations.AddIndex( + model_name="applicationformfillartifact", + index=models.Index( + fields=["created_at"], name="idx_ra_aff_artifact_created" + ), + ), + migrations.AddIndex( + model_name="applicationformfillnotificationrecord", + index=models.Index( + fields=["batch", "created_at"], name="idx_ra_aff_notify_batch" + ), + ), + migrations.AddIndex( + model_name="applicationformfillnotificationrecord", + index=models.Index( + fields=["recipient", "send_status"], name="idx_ra_aff_notify_recipient" + ), + ), + migrations.AddIndex( + model_name="applicationformfillnotificationrecord", + index=models.Index( + fields=["send_status", "retry_count"], name="idx_ra_aff_notify_status" + ), + ), + ] diff --git a/review_agent/models.py b/review_agent/models.py index 3cb703e..541a209 100644 --- a/review_agent/models.py +++ b/review_agent/models.py @@ -334,6 +334,8 @@ class ExportedSummaryFile(models.Model): MARKDOWN = "markdown", "Markdown" EXCEL = "excel", "Excel" JSON = "json", "JSON" + WORD = "word", "Word" + PDF = "pdf", "PDF" class Status(models.TextChoices): SUCCESS = "success", "成功" @@ -397,6 +399,92 @@ class RegulatoryRuleVersion(models.Model): return self.code +class ApplicationFormFillBatch(models.Model): + """Tracks one application-form auto-fill workflow run.""" + + class Status(models.TextChoices): + PENDING = "pending", "待执行" + RUNNING = "running", "执行中" + WAITING_USER = "waiting_user", "等待用户" + SUCCESS = "success", "成功" + PARTIAL_SUCCESS = "partial_success", "部分成功" + FAILED = "failed", "失败" + CANCELLED = "cancelled", "已取消" + + class RegistrationTypeSource(models.TextChoices): + USER_MESSAGE = "user_message", "用户话语" + REGULATORY_BATCH = "regulatory_batch", "法规核查批次" + FILE_EXTRACT = "file_extract", "文件抽取" + UNKNOWN = "unknown", "未知" + + conversation = models.ForeignKey( + Conversation, + on_delete=models.CASCADE, + related_name="application_form_fill_batches", + ) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name="review_application_form_fill_batches", + ) + trigger_message = models.ForeignKey( + Message, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="triggered_application_form_fill_batches", + ) + source_summary_batch = models.ForeignKey( + FileSummaryBatch, + on_delete=models.PROTECT, + related_name="application_form_fill_batches", + ) + source_regulatory_batch = models.ForeignKey( + "RegulatoryReviewBatch", + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="application_form_fill_batches", + ) + batch_no = models.CharField(max_length=64, unique=True) + status = models.CharField(max_length=30, choices=Status.choices, default=Status.PENDING) + requested_templates = models.JSONField(default=list, blank=True) + selected_templates = models.JSONField(default=list, blank=True) + output_types = models.JSONField(default=list, blank=True) + registration_type = models.CharField(max_length=80, blank=True, default="") + registration_type_source = models.CharField( + max_length=40, + choices=RegistrationTypeSource.choices, + default=RegistrationTypeSource.UNKNOWN, + ) + product_name = models.CharField(max_length=200, blank=True, default="") + conflict_summary = models.JSONField(default=list, blank=True) + risk_notes = models.JSONField(default=list, blank=True) + template_config_version = models.CharField(max_length=80, blank=True, default="") + template_config_hash = models.CharField(max_length=128, blank=True, default="") + work_dir = models.CharField(max_length=500, blank=True, default="") + error_message = models.TextField(blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + started_at = models.DateTimeField(null=True, blank=True) + finished_at = models.DateTimeField(null=True, blank=True) + archived_at = models.DateTimeField(null=True, blank=True) + is_deleted = models.BooleanField(default=False) + + class Meta: + db_table = "ra_application_form_fill_batch" + ordering = ["-created_at", "-id"] + indexes = [ + models.Index(fields=["conversation", "status"], name="idx_ra_aff_batch_conv_status"), + models.Index(fields=["source_summary_batch"], name="idx_ra_aff_batch_summary"), + models.Index(fields=["source_regulatory_batch"], name="idx_ra_aff_batch_regulatory"), + models.Index(fields=["user", "created_at"], name="idx_ra_aff_batch_user_created"), + models.Index(fields=["created_at"], name="idx_ra_aff_batch_created"), + ] + + def __str__(self) -> str: + return self.batch_no + + class RegulatoryReviewBatch(models.Model): """Tracks one NMPA regulatory review workflow run.""" @@ -571,3 +659,98 @@ class RegulatoryNotificationRecord(models.Model): indexes = [ models.Index(fields=["batch", "status"], name="idx_ra_rr_notify_status"), ] + + +class ApplicationFormFillArtifact(models.Model): + """Stores auto-fill intermediate files and generated artifacts.""" + + class ArtifactType(models.TextChoices): + TEMPLATE_COPY = "template_copy", "模板副本" + FIELD_EXTRACT_RESULT = "field_extract_result", "字段抽取结果" + MERGED_FIELDS = "merged_fields", "字段合并结果" + TRACEABILITY = "traceability", "追溯清单" + FILLED_TEMPLATE = "filled_template", "已填模板" + NOTIFICATION_RECORD = "notification_record", "通知记录" + + class FileFormat(models.TextChoices): + JSON = "json", "JSON" + EXCEL = "excel", "Excel" + DOCX = "docx", "DOCX" + PDF = "pdf", "PDF" + MARKDOWN = "markdown", "Markdown" + + batch = models.ForeignKey( + ApplicationFormFillBatch, + on_delete=models.CASCADE, + related_name="artifacts", + ) + artifact_type = models.CharField(max_length=60, choices=ArtifactType.choices) + file_format = models.CharField(max_length=20, choices=FileFormat.choices) + name = models.CharField(max_length=160) + file_name = models.CharField(max_length=255) + storage_path = models.CharField(max_length=500) + file_size = models.BigIntegerField(default=0) + content_hash = models.CharField(max_length=128, blank=True, default="") + metadata = models.JSONField(default=dict, blank=True) + created_by_node = models.CharField(max_length=60, blank=True, default="") + created_at = models.DateTimeField(auto_now_add=True) + is_deleted = models.BooleanField(default=False) + + class Meta: + db_table = "ra_application_form_fill_artifact" + ordering = ["-created_at", "-id"] + indexes = [ + models.Index(fields=["batch", "artifact_type"], name="idx_ra_aff_artifact_batch_type"), + models.Index(fields=["file_format"], name="idx_ra_aff_artifact_format"), + models.Index(fields=["created_at"], name="idx_ra_aff_artifact_created"), + ] + + +class ApplicationFormFillNotificationRecord(models.Model): + """Stores mock/Feishu notification records for application-form auto-fill.""" + + class Channel(models.TextChoices): + FEISHU_CLI = "feishu_cli", "飞书 CLI" + FEISHU_API = "feishu_api", "飞书 API" + MOCK = "mock", "模拟" + + class SendStatus(models.TextChoices): + PENDING = "pending", "待发送" + SUCCESS = "success", "成功" + FAILED = "failed", "失败" + + batch = models.ForeignKey( + ApplicationFormFillBatch, + on_delete=models.CASCADE, + related_name="notifications", + ) + recipient = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name="application_form_fill_notifications", + ) + channel = models.CharField(max_length=30, choices=Channel.choices, default=Channel.MOCK) + template_codes = models.JSONField(default=list, blank=True) + export_ids = models.JSONField(default=list, blank=True) + message_summary = models.TextField(blank=True, default="") + send_status = models.CharField( + max_length=20, + choices=SendStatus.choices, + default=SendStatus.PENDING, + ) + retry_count = models.PositiveIntegerField(default=0) + external_message_id = models.CharField(max_length=120, blank=True, default="") + error_message = models.TextField(blank=True, default="") + sent_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + is_deleted = models.BooleanField(default=False) + + class Meta: + db_table = "ra_application_form_fill_notification_record" + ordering = ["-created_at", "-id"] + indexes = [ + models.Index(fields=["batch", "created_at"], name="idx_ra_aff_notify_batch"), + models.Index(fields=["recipient", "send_status"], name="idx_ra_aff_notify_recipient"), + models.Index(fields=["send_status", "retry_count"], name="idx_ra_aff_notify_status"), + ] diff --git a/review_agent/services.py b/review_agent/services.py index de72857..252502a 100644 --- a/review_agent/services.py +++ b/review_agent/services.py @@ -11,6 +11,11 @@ from .file_summary.skills.attachment_reader import AttachmentReaderSkill from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .models import Conversation, FileAttachment, FileSummaryBatch, Message +from .application_form_fill.workflow import ( + create_application_form_fill_batch, + find_latest_successful_summary_batch as find_latest_successful_form_fill_summary_batch, + start_application_form_fill_workflow, +) from .regulatory_review.workflow import ( create_regulatory_review_batch, find_latest_successful_summary_batch, @@ -224,6 +229,85 @@ def stream_message(conversation: Conversation, content: str): ) return + if route.starts_application_form_fill: + source_summary_batch = find_latest_successful_form_fill_summary_batch(conversation) + if not source_summary_batch: + if not _has_active_attachments(conversation): + reply_content = "请先在当前对话右侧上传需要填表的产品资料或压缩包,我会先自动汇总再继续生成申报模板。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + summary_batch = create_file_summary_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + ) + yield sse_event( + "workflow_started", + { + "workflow_type": "file_summary", + "batch_id": summary_batch.pk, + "batch_no": summary_batch.batch_no, + }, + ) + start_file_summary_workflow(summary_batch, async_run=False) + summary_batch.refresh_from_db() + if summary_batch.status != FileSummaryBatch.Status.SUCCESS: + reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动申报文件自动填表。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + source_summary_batch = summary_batch + reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续自动填表。\n" + else: + reply_prefix = "" + batch = create_application_form_fill_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + source_summary_batch=source_summary_batch, + ) + start_application_form_fill_workflow( + batch, + async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True), + ) + reply_content = f"{reply_prefix}已启动申报文件自动填表工作流,批次号:{batch.batch_no}。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event( + "workflow_started", + { + "workflow_type": "application_form_fill", + "batch_id": batch.pk, + "batch_no": batch.batch_no, + }, + ) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + if route.starts_regulatory_review: source_summary_batch = find_latest_successful_summary_batch(conversation) if not source_summary_batch: diff --git a/review_agent/skill_router.py b/review_agent/skill_router.py index 05718e4..b0b5323 100644 --- a/review_agent/skill_router.py +++ b/review_agent/skill_router.py @@ -8,6 +8,7 @@ from .file_summary.workflow_trigger import ( evaluate_attachment_reader_trigger, evaluate_file_summary_trigger, ) +from .application_form_fill.constants import FORM_FILL_TRIGGER_KEYWORDS, WORKFLOW_TYPE as FORM_FILL_WORKFLOW_TYPE from .llm import LLMConfigurationError, LLMRequestError, generate_completion from .models import Conversation, FileAttachment @@ -16,6 +17,7 @@ logger = logging.getLogger(__name__) ROUTE_ACTIONS = {"normal_chat", "attachment_reader", "file_summary"} ROUTE_ACTIONS.add("regulatory_review") +ROUTE_ACTIONS.add(FORM_FILL_WORKFLOW_TYPE) @dataclass(frozen=True) @@ -39,6 +41,10 @@ class SkillRoute: def starts_regulatory_review(self) -> bool: return self.action == "regulatory_review" + @property + def starts_application_form_fill(self) -> bool: + return self.action == FORM_FILL_WORKFLOW_TYPE + @property def is_normal_chat(self) -> bool: return self.action == "normal_chat" @@ -105,7 +111,7 @@ def _route_with_llm( return SkillRoute( action=action, skill_name="attachment_reader" if action == "attachment_reader" else "", - workflow_type=action if action in {"file_summary", "regulatory_review"} else "", + workflow_type=action if action in {"file_summary", "regulatory_review", FORM_FILL_WORKFLOW_TYPE} else "", confidence=_float_or_zero(payload.get("confidence")), reason=str(payload.get("reason") or ""), source="llm", @@ -113,6 +119,15 @@ def _route_with_llm( def _route_with_rules(conversation: Conversation, content: str) -> SkillRoute: + if _matches_application_form_fill(content): + return SkillRoute( + action=FORM_FILL_WORKFLOW_TYPE, + workflow_type=FORM_FILL_WORKFLOW_TYPE, + confidence=0.7, + reason="命中申报文件自动填表关键词。", + source="rule_fallback", + ) + if _matches_regulatory_review(content): return SkillRoute( action="regulatory_review", @@ -162,10 +177,11 @@ def _router_system_prompt() -> str: return ( "你是审核智能体的工具路由器,只判断是否需要调用工具,不直接回答用户。" "你必须只输出 JSON 对象,不要输出 Markdown。" - "可选 action:normal_chat、attachment_reader、file_summary、regulatory_review。" + "可选 action:normal_chat、attachment_reader、file_summary、regulatory_review、application_form_fill。" "attachment_reader 用于用户要求阅读、提取、分析、总结、查看上传附件内容。" "file_summary 用于用户要求自动汇总文件目录、页数、清单或生成目录页数报告。" "regulatory_review 用于用户要求法规核查、NMPA核查、完整性核查、章节一致性核查、风险预警或整改建议。" + "application_form_fill 用于用户要求填注册证、生成申报模板、填写对应表格、安全和性能基本原则清单或自动填表。" "normal_chat 用于不需要读取附件或执行工作流的一般问答。" "输出字段:action、confidence、reason。" ) @@ -217,3 +233,8 @@ def _matches_regulatory_review(content: str) -> bool: "一致性核查", ] return any(keyword in normalized for keyword in keywords) + + +def _matches_application_form_fill(content: str) -> bool: + normalized = content.lower() + return any(keyword.lower() in normalized for keyword in FORM_FILL_TRIGGER_KEYWORDS) diff --git a/review_agent/urls.py b/review_agent/urls.py index 50f4c32..44deeb7 100644 --- a/review_agent/urls.py +++ b/review_agent/urls.py @@ -16,6 +16,10 @@ from .regulatory_review.views import ( review_issues as regulatory_review_review_issues, start_full_review as regulatory_review_start_full_review, ) +from .application_form_fill.views import ( + batch_status as application_form_fill_batch_status, + start as application_form_fill_start, +) urlpatterns = [ @@ -84,4 +88,14 @@ urlpatterns = [ regulatory_review_review_issues, name="regulatory_review_review_issues", ), + path( + "api/review-agent/application-form-fill/start/", + application_form_fill_start, + name="application_form_fill_start", + ), + path( + "api/review-agent/application-form-fill//status/", + application_form_fill_batch_status, + name="application_form_fill_batch_status", + ), ] diff --git a/review_agent/views.py b/review_agent/views.py index 2f78b2b..4b0d3da 100644 --- a/review_agent/views.py +++ b/review_agent/views.py @@ -11,7 +11,7 @@ from .services import ( send_message, stream_message, ) -from .models import Conversation, FileAttachment, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun +from .models import ApplicationFormFillBatch, Conversation, FileAttachment, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun from .regulatory_review.services.info_extract import ensure_regulatory_condition_candidates @@ -155,6 +155,25 @@ def build_workflow_cards(conversation: Conversation) -> list[dict[str, object]]: ), } ) + form_fill_batches = ApplicationFormFillBatch.objects.filter(conversation=conversation, is_deleted=False) + for batch in form_fill_batches: + cards.append( + { + "id": batch.pk, + "workflow_type": "application_form_fill", + "batch_no": batch.batch_no, + "status": batch.status, + "error_message": batch.error_message, + "risk_label": _format_form_fill_label(batch), + "created_at": batch.created_at, + "nodes": list( + WorkflowNodeRun.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).order_by("id") + ), + } + ) return sorted(cards, key=lambda item: item["created_at"], reverse=True)[:5] @@ -187,3 +206,14 @@ def _format_risk_label(risk_summary: dict) -> str: if count: parts.append(f"{label} {count}") return " · ".join(parts) + + +def _format_form_fill_label(batch: ApplicationFormFillBatch) -> str: + parts = [] + if batch.selected_templates: + parts.append("模板 " + "、".join(batch.selected_templates)) + if batch.conflict_summary: + parts.append(f"冲突字段 {len(batch.conflict_summary)}") + if batch.risk_notes: + parts.append(f"提示 {len(batch.risk_notes)}") + return " · ".join(parts) diff --git a/static/js/app.js b/static/js/app.js index d1d4c60..ef6bd75 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -464,8 +464,12 @@ } function statusUrlForWorkflow(workflow_type, batchId) { - var attributeName = - workflow_type === "regulatory_review" ? "data-regulatory-status-url-template" : "data-status-url-template"; + var attributeName = "data-status-url-template"; + if (workflow_type === "regulatory_review") { + attributeName = "data-regulatory-status-url-template"; + } else if (workflow_type === "application_form_fill") { + attributeName = "data-application-form-fill-status-url-template"; + } return templateUrl(attributeName, "__batch_id__", batchId); } @@ -832,7 +836,7 @@ } function isWorkflowTerminalStatus(status) { - return status === "success" || status === "failed"; + return status === "success" || status === "partial_success" || status === "failed"; } function workflowTimerKey(batchId, workflow_type) { diff --git a/templates/home.html b/templates/home.html index 50301fb..f6f49a1 100644 --- a/templates/home.html +++ b/templates/home.html @@ -216,6 +216,7 @@ data-message-url-template="/api/review-agent/conversations/__conversation_id__/messages/" data-status-url-template="/api/review-agent/file-summary/__batch_id__/status/" data-regulatory-status-url-template="/api/review-agent/regulatory-review/__batch_id__/status/" + data-application-form-fill-status-url-template="/api/review-agent/application-form-fill/__batch_id__/status/" data-events-url-template="/api/review-agent/file-summary/__batch_id__/events/" >
diff --git a/tests/test_application_form_fill_field_extract.py b/tests/test_application_form_fill_field_extract.py new file mode 100644 index 0000000..08c7b44 --- /dev/null +++ b/tests/test_application_form_fill_field_extract.py @@ -0,0 +1,121 @@ +import json + +import pytest + +from review_agent.application_form_fill.services.field_extract import ( + extract_by_llm, + extract_by_rules, + run_parallel_extract, + save_field_extract_result, +) +from review_agent.application_form_fill.services.template_config import load_template_config +from review_agent.application_form_fill.services.template_select import select_templates +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def _registration_specs(): + config = load_template_config() + specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册") + return specs + + +def test_rule_extracts_registration_certificate_fields(): + texts = { + "产品说明书.txt": "\n".join( + [ + "产品名称:甲胎蛋白检测试剂盒", + "包装规格:20人份/盒", + "预期用途:用于体外定量检测人血清中甲胎蛋白含量", + "产品储存条件及有效期:2-8℃保存,有效期12个月", + ] + ) + } + + result = extract_by_rules(texts, _registration_specs()) + + values = {field["key"]: field for field in result["fields"]} + assert values["product_name"]["value"] == "甲胎蛋白检测试剂盒" + assert values["intended_use"]["source_role"] == "说明书" + assert "2-8℃保存" in values["storage_condition_and_validity"]["value"] + assert values["package_specification"]["extractor"] == "rule" + + +def test_llm_extract_parses_structured_json(monkeypatch): + monkeypatch.setattr( + "review_agent.application_form_fill.services.field_extract.generate_completion", + lambda messages, temperature=0.0: json.dumps( + { + "fields": [ + { + "key": "product_name", + "label": "产品名称", + "value": "甲胎蛋白检测试剂盒", + "source_file": "说明书.txt", + "source_role": "说明书", + "evidence": "产品名称:甲胎蛋白检测试剂盒", + "confidence": 0.9, + } + ], + "checklist_items": [], + }, + ensure_ascii=False, + ), + ) + + result = extract_by_llm({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs()) + + assert result["fields"][0]["extractor"] == "llm" + assert result["fields"][0]["value"] == "甲胎蛋白检测试剂盒" + + +def test_llm_extract_failure_returns_empty_result(monkeypatch): + monkeypatch.setattr( + "review_agent.application_form_fill.services.field_extract.generate_completion", + lambda messages, temperature=0.0: (_ for _ in ()).throw(TimeoutError("timeout")), + ) + + result = extract_by_llm({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs()) + + assert result["fields"] == [] + assert "timeout" in result["error_message"] + + +def test_parallel_extract_preserves_rule_result_when_llm_fails(monkeypatch): + monkeypatch.setattr( + "review_agent.application_form_fill.services.field_extract.generate_completion", + lambda messages, temperature=0.0: (_ for _ in ()).throw(TimeoutError("timeout")), + ) + + payload = run_parallel_extract({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs()) + + assert payload["regex_results"]["fields"] + assert payload["llm_results"]["fields"] == [] + assert payload["selected_templates"] == ["registration_certificate"] + + +def test_save_field_extract_result_creates_json_artifact(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-FIELD") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-FIELD", + work_dir=str(tmp_path / "aff" / "AFF-FIELD"), + ) + + artifact = save_field_extract_result(batch, {"regex_results": {"fields": []}, "llm_results": {"fields": []}}) + + assert artifact.artifact_type == ApplicationFormFillArtifact.ArtifactType.FIELD_EXTRACT_RESULT + assert artifact.file_format == ApplicationFormFillArtifact.FileFormat.JSON + assert artifact.content_hash diff --git a/tests/test_application_form_fill_field_merge.py b/tests/test_application_form_fill_field_merge.py new file mode 100644 index 0000000..a449ad6 --- /dev/null +++ b/tests/test_application_form_fill_field_merge.py @@ -0,0 +1,79 @@ +import pytest + +from review_agent.application_form_fill.services.field_merge import merge_fields, normalize_field_value, rank_source + + +def test_normalize_field_value_removes_whitespace(): + assert normalize_field_value(" 2-8℃ 保存 \n 有效期12个月 ") == "2-8℃保存有效期12个月" + + +def test_rank_source_prefers_instructions(): + assert rank_source("说明书") < rank_source("产品技术要求") + + +def test_merge_fields_prefers_instructions_and_marks_conflict(): + regex_results = { + "fields": [ + { + "key": "storage_condition_and_validity", + "label": "产品储存条件及有效期", + "value": "2-8℃保存,有效期12个月", + "source_file": "说明书.txt", + "source_role": "说明书", + "evidence": "产品储存条件及有效期:2-8℃保存,有效期12个月", + "confidence": 0.75, + }, + { + "key": "storage_condition_and_validity", + "label": "产品储存条件及有效期", + "value": "-20℃保存", + "source_file": "产品技术要求.txt", + "source_role": "产品技术要求", + "evidence": "产品储存条件及有效期:-20℃保存", + "confidence": 0.8, + }, + ] + } + + merged, conflicts = merge_fields(regex_results, {"fields": []}) + + field = merged["storage_condition_and_validity"] + assert field.value == "2-8℃保存,有效期12个月" + assert field.has_conflict is True + assert conflicts[0]["selected_value"] == "2-8℃保存,有效期12个月" + assert conflicts[0]["conflict_values"][0]["value"] == "-20℃保存" + + +def test_merge_fields_combines_consistent_values_without_conflict(): + regex_results = { + "fields": [ + { + "key": "product_name", + "label": "产品名称", + "value": "甲胎蛋白检测试剂盒", + "source_file": "说明书.txt", + "source_role": "说明书", + "evidence": "产品名称:甲胎蛋白检测试剂盒", + "confidence": 0.75, + } + ] + } + llm_results = { + "fields": [ + { + "key": "product_name", + "label": "产品名称", + "value": "甲胎蛋白 检测试剂盒", + "source_file": "产品技术要求.txt", + "source_role": "产品技术要求", + "evidence": "产品名称:甲胎蛋白 检测试剂盒", + "confidence": 0.9, + } + ] + } + + merged, conflicts = merge_fields(regex_results, llm_results) + + assert merged["product_name"].value == "甲胎蛋白检测试剂盒" + assert merged["product_name"].has_conflict is False + assert conflicts == [] diff --git a/tests/test_application_form_fill_frontend.py b/tests/test_application_form_fill_frontend.py new file mode 100644 index 0000000..ae16656 --- /dev/null +++ b/tests/test_application_form_fill_frontend.py @@ -0,0 +1,48 @@ +import pytest +from django.urls import reverse + +from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, WorkflowNodeRun + + +pytestmark = pytest.mark.django_db + + +def test_workspace_renders_application_form_fill_workflow_card(client, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-CARD") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-CARD", + status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS, + selected_templates=["registration_certificate"], + risk_notes=[{"type": "pdf_pending"}], + ) + WorkflowNodeRun.objects.create( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + node_group="form_fill", + node_code="word_fill", + node_name="填写 Word", + status=WorkflowNodeRun.Status.SUCCESS, + progress=100, + ) + client.force_login(user) + + response = client.get(f"{reverse('home')}?conversation={conversation.pk}") + + content = response.content.decode("utf-8") + assert "AFF-CARD" in content + assert 'data-workflow-type="application_form_fill"' in content + assert "填写 Word" in content + assert "data-application-form-fill-status-url-template" in content + + +def test_frontend_selects_application_form_fill_status_url_and_terminal_status(): + script = open("static/js/app.js", encoding="utf-8").read() + + assert 'workflow_type === "application_form_fill"' in script + assert "data-application-form-fill-status-url-template" in script + assert 'status === "partial_success"' in script diff --git a/tests/test_application_form_fill_models.py b/tests/test_application_form_fill_models.py new file mode 100644 index 0000000..92be9df --- /dev/null +++ b/tests/test_application_form_fill_models.py @@ -0,0 +1,109 @@ +import pytest + +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + ApplicationFormFillNotificationRecord, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, + Message, + RegulatoryReviewBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_application_form_fill_models_store_batch_artifact_notification_and_exports(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="自动填表") + trigger = Message.objects.create( + conversation=conversation, + role=Message.Role.USER, + content="帮我填注册证", + ) + summary_batch = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-READY", + status=FileSummaryBatch.Status.SUCCESS, + ) + regulatory_batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary_batch, + batch_no="RR-AFF-SOURCE", + condition_json={"confirmed": True, "registration_type": "首次注册"}, + ) + + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary_batch, + source_regulatory_batch=regulatory_batch, + batch_no="AFF-20260607153000-abcdef", + requested_templates=["registration_certificate"], + selected_templates=["registration_certificate"], + output_types=["word", "excel", "json"], + registration_type="首次注册", + registration_type_source=ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE, + product_name="甲胎蛋白检测试剂盒", + conflict_summary=[{"field_key": "storage_condition"}], + risk_notes=[{"type": "pdf_pending"}], + template_config_version="application_form_templates_v1", + template_config_hash="hash", + work_dir="media/application_form_fill/1/1/AFF-20260607153000-abcdef", + ) + artifact = ApplicationFormFillArtifact.objects.create( + batch=batch, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE, + file_format=ApplicationFormFillArtifact.FileFormat.DOCX, + name="注册证格式", + file_name="filled.docx", + storage_path="media/application_form_fill/filled.docx", + file_size=123, + content_hash="sha256", + metadata={"template_code": "registration_certificate"}, + created_by_node="word_fill", + ) + notification = ApplicationFormFillNotificationRecord.objects.create( + batch=batch, + recipient=user, + channel=ApplicationFormFillNotificationRecord.Channel.MOCK, + template_codes=["registration_certificate"], + export_ids=[1], + message_summary="自动填表完成", + send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED, + retry_count=1, + error_message="mock failed", + ) + word_export = ExportedSummaryFile.objects.create( + batch=summary_batch, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path="media/application_form_fill/filled.docx", + ) + pdf_export = ExportedSummaryFile.objects.create( + batch=summary_batch, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.PDF, + file_name="filled.pdf", + storage_path="media/application_form_fill/filled.pdf", + ) + + assert batch.status == ApplicationFormFillBatch.Status.PENDING + assert batch.source_summary_batch == summary_batch + assert batch.source_regulatory_batch == regulatory_batch + assert artifact.content_hash == "sha256" + assert artifact.metadata["template_code"] == "registration_certificate" + assert notification.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED + assert notification.retry_count == 1 + assert word_export.export_type == ExportedSummaryFile.ExportType.WORD + assert pdf_export.export_type == ExportedSummaryFile.ExportType.PDF diff --git a/tests/test_application_form_fill_notification.py b/tests/test_application_form_fill_notification.py new file mode 100644 index 0000000..9905689 --- /dev/null +++ b/tests/test_application_form_fill_notification.py @@ -0,0 +1,61 @@ +import pytest + +from review_agent.application_form_fill.services.notifier import notify_completion +from review_agent.models import ( + ApplicationFormFillBatch, + ApplicationFormFillNotificationRecord, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_notify_completion_records_success(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-NOTIFY", + selected_templates=["registration_certificate"], + ) + exported = ExportedSummaryFile.objects.create( + batch=summary, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path="filled.docx", + ) + + record = notify_completion(batch, [exported]) + + assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.SUCCESS + assert record.export_ids == [exported.pk] + assert record.template_codes == ["registration_certificate"] + assert record.sent_at is not None + + +def test_notify_completion_records_failure_without_raising(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY-FAIL") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-NOTIFY-FAIL", + selected_templates=["registration_certificate"], + ) + + record = notify_completion(batch, [], fail=True) + + assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED + assert record.retry_count == 1 + assert "mock" in record.error_message diff --git a/tests/test_application_form_fill_template_config.py b/tests/test_application_form_fill_template_config.py new file mode 100644 index 0000000..b8e8859 --- /dev/null +++ b/tests/test_application_form_fill_template_config.py @@ -0,0 +1,97 @@ +import copy + +import pytest + +from review_agent.application_form_fill.services.template_config import ( + DEFAULT_CONFIG_PATH, + compute_config_hash, + load_template_config, + validate_template_config, +) + + +def test_template_config_loads_and_validates_default_yaml(settings): + config = load_template_config() + errors = validate_template_config(config) + + assert errors == [] + assert config["version"] == "application_form_templates_v1" + registration = next(item for item in config["templates"] if item["code"] == "registration_certificate") + assert registration["file_format"] == "docx" + assert {field["key"] for field in registration["fields"]} >= { + "applicant_name", + "product_name", + "package_specification", + "main_components", + "intended_use", + "storage_condition_and_validity", + "attachments", + } + assert all(field["target"]["type"] == "table_row" for field in registration["fields"]) + assert len(compute_config_hash(DEFAULT_CONFIG_PATH)) == 64 + + +def test_template_config_reports_missing_source_dir(): + config = load_template_config() + config["source_dir"] = "docs/not-exists" + + errors = validate_template_config(config) + + assert any("source_dir 不存在" in error for error in errors) + + +def test_template_config_reports_duplicate_code(): + config = load_template_config() + duplicate = copy.deepcopy(config["templates"][0]) + config["templates"].append(duplicate) + + errors = validate_template_config(config) + + assert any("模板 code 重复" in error for error in errors) + + +def test_template_config_reports_missing_source_file(): + config = load_template_config() + config["templates"][0]["source_file"] = "missing.docx" + + errors = validate_template_config(config) + + assert any("source_file 不存在" in error for error in errors) + + +def test_template_config_reports_unsupported_target_type(): + config = load_template_config() + config["templates"][0]["fields"][0]["target"]["type"] = "content_control" + + errors = validate_template_config(config) + + assert any("target.type 不支持" in error for error in errors) + + +def test_template_config_loads_custom_path(tmp_path): + config_path = tmp_path / "templates.yaml" + config_path.write_text( + """ +version: custom +source_dir: . +templates: + - code: custom_template + name: Custom + source_file: source.docx + output_label: Custom + file_format: docx + fields: + - key: product_name + label: 产品名称 + target: + type: table_row + row_label: 产品名称 +""".strip(), + encoding="utf-8", + ) + (tmp_path / "source.docx").write_bytes(b"docx") + + config = load_template_config(config_path) + + assert validate_template_config(config, base_dir=tmp_path) == [] + assert compute_config_hash(config_path) diff --git a/tests/test_application_form_fill_template_repository.py b/tests/test_application_form_fill_template_repository.py new file mode 100644 index 0000000..aafa001 --- /dev/null +++ b/tests/test_application_form_fill_template_repository.py @@ -0,0 +1,60 @@ +import pytest + +from review_agent.application_form_fill.services.template_config import load_template_config +from review_agent.application_form_fill.services.template_repository import ( + TemplateUnavailableError, + copy_template_to_batch, + resolve_source_template, +) +from review_agent.application_form_fill.services.template_select import select_templates +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_resolve_source_template_finds_registration_docx(): + config = load_template_config() + specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册") + + path = resolve_source_template(specs[0], config) + + assert path.exists() + assert path.name == "中华人民共和国医疗器械注册证(体外诊断试剂)(格式).docx" + + +def test_copy_template_to_batch_creates_artifact(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-REPO") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-REPO", + work_dir=str(tmp_path / "aff" / "AFF-REPO"), + ) + config = load_template_config() + specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册") + + artifact = copy_template_to_batch(specs[0], batch, config) + + assert artifact.artifact_type == ApplicationFormFillArtifact.ArtifactType.TEMPLATE_COPY + assert artifact.file_format == "docx" + assert artifact.content_hash + assert artifact.metadata["template_code"] == "registration_certificate" + assert artifact.storage_path.startswith(batch.work_dir) + + +def test_doc_template_without_working_docx_is_unavailable(): + config = load_template_config() + specs, _risk_notes = select_templates(config, ["change_registration"], "变更注册") + + with pytest.raises(TemplateUnavailableError): + resolve_source_template(specs[0], config) diff --git a/tests/test_application_form_fill_template_select.py b/tests/test_application_form_fill_template_select.py new file mode 100644 index 0000000..dada57e --- /dev/null +++ b/tests/test_application_form_fill_template_select.py @@ -0,0 +1,114 @@ +import pytest + +from review_agent.application_form_fill.services.template_config import load_template_config +from review_agent.application_form_fill.services.template_select import ( + detect_registration_type, + parse_requested_templates, + select_templates, +) +from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, RegulatoryReviewBatch + + +pytestmark = pytest.mark.django_db + + +@pytest.mark.parametrize( + ("message", "expected"), + [ + ("帮我填注册证", ["registration_certificate"]), + ("生成变更注册备案文件", ["change_registration"]), + ("生成安全和性能基本原则清单", ["essential_principles"]), + ("请生成全部模板", ["registration_certificate", "change_registration", "essential_principles"]), + ("普通聊天", []), + ], +) +def test_parse_requested_templates(message, expected): + assert parse_requested_templates(message) == expected + + +def test_detect_registration_type_prefers_user_message(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-SEL") + regulatory = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-SEL", + condition_json={"confirmed_conditions": {"registration_type": "变更注册"}}, + ) + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + source_regulatory_batch=regulatory, + batch_no="AFF-SEL", + ) + + value, source = detect_registration_type(batch=batch, message="首次注册资料,请填注册证") + + assert value == "首次注册" + assert source == ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE + + +def test_detect_registration_type_falls_back_to_regulatory_batch_and_file_candidates(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-SEL-2") + regulatory = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-SEL-2", + condition_json={"confirmed_conditions": {"registration_type": "变更注册"}}, + ) + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + source_regulatory_batch=regulatory, + batch_no="AFF-SEL-2", + ) + + regulatory_value, regulatory_source = detect_registration_type(batch=batch, message="") + file_value, file_source = detect_registration_type( + message="", + file_candidates={"registration_type": {"suggested": "备案"}}, + ) + + assert (regulatory_value, regulatory_source) == ( + "变更注册", + ApplicationFormFillBatch.RegistrationTypeSource.REGULATORY_BATCH, + ) + assert (file_value, file_source) == ( + "备案", + ApplicationFormFillBatch.RegistrationTypeSource.FILE_EXTRACT, + ) + + +def test_select_default_templates_for_initial_registration(): + config = load_template_config() + + specs, risk_notes = select_templates(config, [], "首次注册") + + assert [spec.code for spec in specs] == ["registration_certificate", "essential_principles"] + assert risk_notes == [] + + +def test_select_default_templates_for_change_registration(): + config = load_template_config() + + specs, risk_notes = select_templates(config, [], "变更注册") + + assert [spec.code for spec in specs] == ["change_registration", "essential_principles"] + assert risk_notes == [] + + +def test_select_user_requested_mismatch_is_allowed_with_risk_note(): + config = load_template_config() + + specs, risk_notes = select_templates(config, ["change_registration"], "首次注册") + + assert [spec.code for spec in specs] == ["change_registration"] + assert risk_notes + assert risk_notes[0]["type"] == "template_registration_mismatch" diff --git a/tests/test_application_form_fill_traceability.py b/tests/test_application_form_fill_traceability.py new file mode 100644 index 0000000..cec08f8 --- /dev/null +++ b/tests/test_application_form_fill_traceability.py @@ -0,0 +1,85 @@ +import json + +import pytest +from openpyxl import load_workbook + +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.services.traceability_export import save_traceability_exports +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def test_traceability_exports_excel_json_and_records(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-TRACE") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-TRACE", + work_dir=str(tmp_path / "aff" / "AFF-TRACE"), + ) + spec = TemplateSpec( + code="registration_certificate", + name="注册证格式", + source_file="template.docx", + output_label="注册证格式", + applies_when={}, + file_format="docx", + fields=[{"key": "product_name", "label": "产品名称"}], + ) + merged_fields = { + "product_name": MergedField( + "product_name", + "产品名称", + "甲胎蛋白检测试剂盒", + "说明书.txt", + "产品名称:甲胎蛋白检测试剂盒", + 0.8, + ) + } + conflicts = [ + { + "field_key": "storage_condition", + "field_label": "储存条件", + "selected_value": "2-8℃", + "conflict_values": [{"value": "-20℃", "source_file": "产品技术要求.txt"}], + "handling": "说明书优先", + } + ] + + exports = save_traceability_exports( + batch, + merged_fields, + conflicts, + [spec], + [{"template_label": "注册证格式", "word_status": "success", "pdf_status": "待增强"}], + ) + + assert {export.export_type for export in exports} == { + ExportedSummaryFile.ExportType.EXCEL, + ExportedSummaryFile.ExportType.JSON, + } + excel_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.EXCEL) + workbook = load_workbook(excel_export.storage_path) + assert workbook.sheetnames == ["字段追溯", "冲突字段", "低置信度条目", "生成结果"] + assert workbook["字段追溯"]["B2"].value == "产品名称" + assert workbook["冲突字段"]["C2"].value == "-20℃" + + json_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.JSON) + payload = json.loads(open(json_export.storage_path, encoding="utf-8").read()) + assert payload["merged_fields"]["product_name"]["value"] == "甲胎蛋白检测试剂盒" + assert ApplicationFormFillArtifact.objects.filter( + batch=batch, + artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY, + ).exists() diff --git a/tests/test_application_form_fill_trigger.py b/tests/test_application_form_fill_trigger.py new file mode 100644 index 0000000..8272f29 --- /dev/null +++ b/tests/test_application_form_fill_trigger.py @@ -0,0 +1,45 @@ +import pytest + +from review_agent.models import Conversation +from review_agent.skill_router import route_message_intent + + +pytestmark = pytest.mark.django_db + + +@pytest.mark.parametrize( + "content", + [ + "帮我填注册证", + "给我这个内容对应的表格", + "为我该方案生成申报模板", + "请自动填表并生成表格", + "生成安全和性能基本原则清单", + ], +) +def test_rule_router_starts_application_form_fill_for_keywords(monkeypatch, django_user_model, content): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.skill_router._route_with_llm", + lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), + ) + + route = route_message_intent(conversation, content) + + assert route.action == "application_form_fill" + assert route.workflow_type == "application_form_fill" + assert route.starts_application_form_fill + + +def test_rule_router_does_not_misroute_normal_chat(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.skill_router._route_with_llm", + lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), + ) + + route = route_message_intent(conversation, "你好,解释一下法规背景") + + assert route.action == "normal_chat" diff --git a/tests/test_application_form_fill_views.py b/tests/test_application_form_fill_views.py new file mode 100644 index 0000000..65c6b77 --- /dev/null +++ b/tests/test_application_form_fill_views.py @@ -0,0 +1,113 @@ +import json + +import pytest +from django.urls import reverse + +from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS +from review_agent.models import ( + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, + WorkflowNodeRun, +) + + +pytestmark = pytest.mark.django_db + + +def test_application_form_fill_start_requires_conversation_owner(client, monkeypatch, django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + other = django_user_model.objects.create_user(username="other", password="pass") + conversation = Conversation.objects.create(user=owner, title="会话") + FileSummaryBatch.objects.create( + conversation=conversation, + user=owner, + batch_no="FS-VIEW", + status=FileSummaryBatch.Status.SUCCESS, + ) + monkeypatch.setattr("review_agent.application_form_fill.views.start_application_form_fill_workflow", lambda batch, async_run=True: None) + client.force_login(other) + + response = client.post( + reverse("application_form_fill_start"), + data=json.dumps({"conversation_id": conversation.pk}), + content_type="application/json", + ) + + assert response.status_code == 404 + + +def test_application_form_fill_start_creates_batch(client, monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-VIEW-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + monkeypatch.setattr("review_agent.application_form_fill.views.start_application_form_fill_workflow", lambda batch, async_run=True: None) + client.force_login(user) + + response = client.post( + reverse("application_form_fill_start"), + data=json.dumps({"conversation_id": conversation.pk, "template_codes": ["registration_certificate"]}), + content_type="application/json", + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["workflow_type"] == "application_form_fill" + assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists() + + +def test_application_form_fill_status_requires_owner_and_returns_nodes_exports(client, tmp_path, django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + other = django_user_model.objects.create_user(username="other", password="pass") + conversation = Conversation.objects.create(user=owner, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-STATUS") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=owner, + source_summary_batch=summary, + batch_no="AFF-STATUS", + status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS, + selected_templates=["registration_certificate"], + conflict_summary=[{"field_key": "product_name"}], + risk_notes=[{"type": "pdf_pending"}], + ) + WorkflowNodeRun.objects.create( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + node_group="form_fill", + node_code=FORM_FILL_NODE_DEFINITIONS[0][0], + node_name=FORM_FILL_NODE_DEFINITIONS[0][1], + status=WorkflowNodeRun.Status.SUCCESS, + progress=100, + ) + output = tmp_path / "filled.docx" + output.write_bytes(b"word") + exported = ExportedSummaryFile.objects.create( + batch=summary, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path=str(output), + ) + + client.force_login(other) + denied = client.get(reverse("application_form_fill_batch_status", args=[batch.pk])) + assert denied.status_code == 404 + + client.force_login(owner) + allowed = client.get(reverse("application_form_fill_batch_status", args=[batch.pk])) + assert allowed.status_code == 200 + payload = allowed.json() + assert payload["batch"]["workflow_type"] == "application_form_fill" + assert payload["batch"]["status"] == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS + assert payload["batch"]["conflict_count"] == 1 + assert payload["nodes"][0]["node_code"] == "prepare" + assert payload["exports"][0]["id"] == exported.pk diff --git a/tests/test_application_form_fill_word_fill.py b/tests/test_application_form_fill_word_fill.py new file mode 100644 index 0000000..264b716 --- /dev/null +++ b/tests/test_application_form_fill_word_fill.py @@ -0,0 +1,121 @@ +import zipfile + +import pytest +from docx import Document + +from review_agent.application_form_fill.schemas import MergedField, TemplateSpec +from review_agent.application_form_fill.services.word_fill import create_word_export, fill_template +from review_agent.models import ( + ApplicationFormFillArtifact, + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, +) + + +pytestmark = pytest.mark.django_db + + +def _spec(): + return TemplateSpec( + code="registration_certificate", + name="注册证格式", + source_file="template.docx", + output_label="注册证格式", + applies_when={"registration_type": ["首次注册"]}, + file_format="docx", + fields=[ + {"key": "product_name", "label": "产品名称", "target": {"type": "table_row", "row_label": "产品名称"}}, + {"key": "intended_use", "label": "预期用途", "target": {"type": "table_row", "row_label": "预期用途"}}, + ], + ) + + +def _template(path): + document = Document() + table = document.add_table(rows=2, cols=2) + table.rows[0].cells[0].text = "产品名称" + table.rows[1].cells[0].text = "预期用途" + document.save(path) + + +def test_word_fill_writes_table_rows(tmp_path): + template_path = tmp_path / "template.docx" + output_path = tmp_path / "filled.docx" + _template(template_path) + + fill_template( + template_path, + output_path, + _spec(), + { + "product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8), + "intended_use": MergedField("intended_use", "预期用途", "用于体外检测", "说明书.txt", "证据", 0.8), + }, + ) + + document = Document(output_path) + assert document.tables[0].rows[0].cells[1].text == "甲胎蛋白检测试剂盒" + assert document.tables[0].rows[1].cells[1].text == "用于体外检测" + + +def test_word_fill_highlights_conflict_in_docx_xml(tmp_path): + template_path = tmp_path / "template.docx" + output_path = tmp_path / "filled.docx" + _template(template_path) + + fill_template( + template_path, + output_path, + _spec(), + { + "product_name": MergedField( + "product_name", + "产品名称", + "甲胎蛋白检测试剂盒", + "说明书.txt", + "证据", + 0.8, + has_conflict=True, + ) + }, + conflicts=[{"field_key": "product_name"}], + ) + + with zipfile.ZipFile(output_path) as package: + document_xml = package.read("word/document.xml").decode("utf-8") + assert 'w:fill="FFFF00"' in document_xml + assert 'w:color w:val="FF0000"' in document_xml + + +def test_create_word_export_records_artifact_and_export(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-WORD") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="AFF-WORD", + product_name="甲胎蛋白检测试剂盒", + work_dir=str(tmp_path / "aff" / "AFF-WORD"), + ) + template_path = tmp_path / "template.docx" + _template(template_path) + + exported = create_word_export( + batch, + _spec(), + template_path, + {"product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8)}, + ) + + assert exported.export_type == ExportedSummaryFile.ExportType.WORD + assert exported.workflow_type == "application_form_fill" + assert exported.workflow_batch_id == batch.pk + assert ApplicationFormFillArtifact.objects.filter( + batch=batch, + artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE, + ).exists() diff --git a/tests/test_application_form_fill_workflow.py b/tests/test_application_form_fill_workflow.py new file mode 100644 index 0000000..abfe369 --- /dev/null +++ b/tests/test_application_form_fill_workflow.py @@ -0,0 +1,272 @@ +import pytest + +from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS +from review_agent.application_form_fill.workflow import ( + create_application_form_fill_batch, + find_latest_successful_summary_batch, + start_application_form_fill_workflow, +) +from review_agent.models import ( + ApplicationFormFillBatch, + Conversation, + FileAttachment, + FileSummaryBatch, + Message, + WorkflowEvent, + WorkflowNodeRun, +) +from review_agent.services import stream_message +from review_agent.skill_router import SkillRoute + + +pytestmark = pytest.mark.django_db + + +@pytest.fixture(autouse=True) +def stub_aff_llm_extract(monkeypatch): + monkeypatch.setattr( + "review_agent.application_form_fill.services.field_extract.generate_completion", + lambda messages, temperature=0.0: '{"fields": [], "checklist_items": []}', + ) + + +def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + success = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-FAILED", + status=FileSummaryBatch.Status.FAILED, + ) + + assert find_latest_successful_summary_batch(conversation) == success + + +def test_create_application_form_fill_batch_initializes_nodes(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=message, + source_summary_batch=summary, + ) + + assert batch.status == ApplicationFormFillBatch.Status.PENDING + assert batch.output_types == ["word", "excel", "json"] + assert WorkflowNodeRun.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).count() == len(FORM_FILL_NODE_DEFINITIONS) + assert WorkflowEvent.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + event_type="workflow_created", + ).exists() + + +def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.SUCCESS + assert WorkflowNodeRun.objects.get( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + node_code="pdf_convert", + ).status == WorkflowNodeRun.Status.SKIPPED + assert WorkflowEvent.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + event_type="workflow_completed", + ).exists() + + +def test_application_form_fill_workflow_generates_summary_and_exports(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-FULL", + status=FileSummaryBatch.Status.SUCCESS, + ) + source = tmp_path / "ifu.txt" + source.write_text("产品名称:甲胎蛋白检测试剂盒\n预期用途:用于体外检测", encoding="utf-8") + from review_agent.models import FileSummaryItem + + FileSummaryItem.objects.create( + batch=summary, + file_index=1, + file_name="说明书.txt", + file_type="txt", + relative_path="说明书.txt", + storage_path=str(source), + ) + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.SUCCESS + assert batch.product_name == "甲胎蛋白检测试剂盒" + assert batch.selected_templates == ["registration_certificate"] + assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已生成申报模板自动填表文件").exists() + assert batch.notifications.filter(send_status="success").exists() + + +def test_application_form_fill_status_becomes_partial_when_notification_fails(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL = True + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-PARTIAL", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = create_application_form_fill_batch( + conversation=conversation, + user=user, + trigger_message=trigger, + source_summary_batch=summary, + ) + + start_application_form_fill_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS + assert batch.notifications.filter(send_status="failed").exists() + + +def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "帮我填注册证")) + + joined = "".join(frames) + assert "请先在当前对话右侧上传需要填表的产品资料或压缩包" in joined + assert not ApplicationFormFillBatch.objects.exists() + + +def test_stream_message_starts_application_form_fill_workflow(monkeypatch, settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-AFF-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "帮我填注册证")) + + joined = "".join(frames) + assert "workflow_started" in joined + assert '"workflow_type": "application_form_fill"' in joined + assert "已启动申报文件自动填表工作流" in joined + assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists() + + +def test_stream_message_auto_runs_summary_before_application_form_fill( + monkeypatch, settings, tmp_path, django_user_model +): + settings.MEDIA_ROOT = tmp_path + settings.APPLICATION_FORM_FILL_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + attachment_path = tmp_path / "application.txt" + attachment_path.write_text("产品名称:甲胎蛋白检测试剂盒", encoding="utf-8") + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="application.txt", + storage_path=str(attachment_path), + file_size=attachment_path.stat().st_size, + is_active=True, + ) + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="application_form_fill", + workflow_type="application_form_fill", + confidence=0.9, + ), + ) + + def finish_summary(batch, async_run=True): + batch.status = FileSummaryBatch.Status.SUCCESS + batch.save(update_fields=["status"]) + + monkeypatch.setattr("review_agent.services.start_file_summary_workflow", finish_summary) + + frames = list(stream_message(conversation, "为我该方案生成申报模板")) + joined = "".join(frames) + + assert '"workflow_type": "file_summary"' in joined + assert '"workflow_type": "application_form_fill"' in joined + assert "汇总完成后继续自动填表" in joined + assert FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS).exists() + assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists() diff --git a/tests/test_file_summary_views.py b/tests/test_file_summary_views.py index 6aeaa7f..ec0411f 100644 --- a/tests/test_file_summary_views.py +++ b/tests/test_file_summary_views.py @@ -4,6 +4,7 @@ import json import pytest from review_agent.models import ( + ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileAttachment, @@ -109,6 +110,54 @@ def test_export_download_requires_batch_owner(client, tmp_path, django_user_mode assert allowed["Content-Type"].startswith("text/markdown") +def test_export_download_checks_application_form_fill_batch_owner(client, tmp_path, django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + other = django_user_model.objects.create_user(username="other", password="pass") + owner_conversation = Conversation.objects.create(user=owner, title="自动填表") + other_conversation = Conversation.objects.create(user=other, title="其他对话") + owner_summary = FileSummaryBatch.objects.create( + conversation=owner_conversation, + user=owner, + batch_no="FS-AFF-OWNER", + status=FileSummaryBatch.Status.SUCCESS, + ) + other_summary = FileSummaryBatch.objects.create( + conversation=other_conversation, + user=other, + batch_no="FS-AFF-OTHER", + status=FileSummaryBatch.Status.SUCCESS, + ) + form_batch = ApplicationFormFillBatch.objects.create( + conversation=owner_conversation, + user=owner, + source_summary_batch=owner_summary, + batch_no="AFF-DL", + ) + report_path = tmp_path / "filled.docx" + report_path.write_bytes(b"word-content") + exported = ExportedSummaryFile.objects.create( + batch=other_summary, + workflow_type="application_form_fill", + workflow_batch_id=form_batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path=str(report_path), + ) + + client.force_login(other) + denied = client.get(reverse("file_summary_export_download", args=[exported.pk])) + assert denied.status_code == 404 + + client.force_login(owner) + allowed = client.get(reverse("file_summary_export_download", args=[exported.pk])) + assert allowed.status_code == 200 + assert allowed["Content-Type"].startswith( + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + ) + assert b"".join(allowed.streaming_content) == b"word-content" + + def test_conversation_messages_returns_incremental_messages(client, django_user_model): owner = django_user_model.objects.create_user(username="owner", password="pass") other = django_user_model.objects.create_user(username="other", password="pass")