Compare commits

...

10 Commits

42 changed files with 3777 additions and 11 deletions

View File

@@ -0,0 +1 @@
"""Application form auto-fill workflow package."""

View File

@@ -0,0 +1,31 @@
WORKFLOW_TYPE = "application_form_fill"
TEMPLATE_REGISTRATION_CERTIFICATE = "registration_certificate"
TEMPLATE_CHANGE_REGISTRATION = "change_registration"
TEMPLATE_ESSENTIAL_PRINCIPLES = "essential_principles"
DEFAULT_OUTPUT_TYPES = ["word", "excel", "json"]
FORM_FILL_TRIGGER_KEYWORDS = [
"填注册证",
"对应的表格",
"生成申报模板",
"安全和性能基本原则清单",
"填到申报模板",
"自动填表",
"生成表格",
]
FORM_FILL_NODE_DEFINITIONS = [
("prepare", "准备资料", "form_fill"),
("template_select", "选择模板", "form_fill"),
("template_copy", "复制模板", "form_fill"),
("field_extract", "抽取字段", "form_fill"),
("conflict_merge", "冲突归并", "form_fill"),
("word_fill", "填写 Word", "form_fill"),
("pdf_convert", "转换 PDF", "form_fill"),
("trace_export", "追溯清单", "form_fill"),
("output_export", "输出下载", "form_fill"),
("notify", "飞书通知", "form_fill"),
("completed", "完成", "completed"),
]

View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from review_agent.application_form_fill.constants import WORKFLOW_TYPE
from review_agent.models import ApplicationFormFillBatch, WorkflowEvent
def record_event(
batch: ApplicationFormFillBatch,
event_type: str,
payload: dict | None = None,
) -> WorkflowEvent:
return WorkflowEvent.objects.create(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
conversation=batch.conversation,
event_type=event_type,
payload=payload or {},
)
def serialize_event(event: WorkflowEvent) -> dict[str, object]:
return {
"id": event.pk,
"event_type": event.event_type,
"payload": event.payload,
"created_at": event.created_at.isoformat(),
}

View File

@@ -0,0 +1,23 @@
你是医疗器械体外诊断试剂申报资料字段抽取助手。
请只输出 JSON 对象,不要输出 Markdown。结构如下
{
"fields": [
{
"key": "product_name",
"label": "产品名称",
"value": "字段值",
"source_file": "来源文件名",
"source_role": "说明书",
"evidence": "原文证据",
"confidence": 0.8
}
],
"checklist_items": []
}
要求:
- 只抽取输入模板字段中出现的信息。
- 字段值必须来自资料原文,不要编造。
- 找不到时不要输出该字段。

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile, FileSummaryBatch, RegulatoryReviewBatch
@dataclass(frozen=True)
class TemplateSpec:
code: str
name: str
source_file: str
output_label: str
applies_when: dict[str, Any]
file_format: str
fields: list[dict[str, Any]]
checklist_items: list[dict[str, Any]] = field(default_factory=list)
@dataclass(frozen=True)
class ExtractedField:
key: str
label: str
value: str
source_file: str
source_role: str
evidence: str
extractor: str
confidence: float
@dataclass(frozen=True)
class MergedField:
key: str
label: str
value: str
source_file: str
evidence: str
confidence: float
has_conflict: bool = False
conflict_values: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class FormFillContext:
batch: ApplicationFormFillBatch
source_summary_batch: FileSummaryBatch
source_regulatory_batch: RegulatoryReviewBatch | None
template_config: dict[str, Any] = field(default_factory=dict)
selected_templates: list[TemplateSpec] = field(default_factory=list)
document_texts: dict[str, str] = field(default_factory=dict)
regex_results: dict[str, Any] = field(default_factory=dict)
llm_results: dict[str, Any] = field(default_factory=dict)
merged_fields: dict[str, MergedField] = field(default_factory=dict)
checklist_items: dict[str, Any] = field(default_factory=dict)
conflicts: list[dict[str, Any]] = field(default_factory=list)
exports: list[ExportedSummaryFile] = field(default_factory=list)

View File

@@ -0,0 +1 @@
"""Application form auto-fill services."""

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
import json
import re
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
from django.conf import settings
from review_agent.application_form_fill.schemas import ExtractedField, TemplateSpec
from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir
from review_agent.llm import generate_completion
from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, FileSummaryBatch
from review_agent.regulatory_review.services.text_extract import extract_text
def collect_document_texts(summary_batch: FileSummaryBatch) -> dict[str, str]:
texts: dict[str, str] = {}
for item in summary_batch.items.order_by("file_index"):
path = Path(item.storage_path)
if not path.is_absolute():
path = Path(settings.MEDIA_ROOT) / item.storage_path
if not path.exists():
continue
result = extract_text(path)
if result.status == "success" and result.text:
texts[item.file_name] = result.text
return texts
def extract_by_rules(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]:
fields: list[dict[str, Any]] = []
field_defs = _field_defs(specs)
labels = [field["label"] for field in field_defs if field.get("label")]
for file_name, text in texts.items():
source_role = detect_source_role(file_name, text)
for field in field_defs:
value, evidence = _extract_label_value(text, field["label"], labels)
if not value:
continue
fields.append(
ExtractedField(
key=field["key"],
label=field["label"],
value=value,
source_file=file_name,
source_role=source_role,
evidence=evidence,
extractor="rule",
confidence=0.75 if source_role == "说明书" else 0.65,
).__dict__
)
return {"fields": fields, "checklist_items": []}
def extract_by_llm(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]:
try:
raw = generate_completion(
[
{"role": "system", "content": _prompt_text()},
{"role": "user", "content": _build_llm_user_prompt(texts, specs)},
],
temperature=0.0,
)
payload = _parse_json_object(raw)
except Exception as exc:
return {"fields": [], "checklist_items": [], "error_message": str(exc)}
fields = []
allowed_keys = {field["key"] for field in _field_defs(specs)}
for item in payload.get("fields") or []:
if not isinstance(item, dict) or item.get("key") not in allowed_keys or not item.get("value"):
continue
fields.append(
{
"key": str(item.get("key") or ""),
"label": str(item.get("label") or item.get("key") or ""),
"value": str(item.get("value") or "").strip(),
"source_file": str(item.get("source_file") or ""),
"source_role": str(item.get("source_role") or detect_source_role(str(item.get("source_file") or ""), "")),
"evidence": str(item.get("evidence") or "").strip(),
"extractor": "llm",
"confidence": _float_confidence(item.get("confidence"), default=0.7),
}
)
return {"fields": fields, "checklist_items": payload.get("checklist_items") or []}
def run_parallel_extract(texts: dict[str, str], specs: list[TemplateSpec]) -> dict[str, Any]:
with ThreadPoolExecutor(max_workers=2) as executor:
rule_future = executor.submit(extract_by_rules, texts, specs)
llm_future = executor.submit(extract_by_llm, texts, specs)
regex_results = rule_future.result()
llm_results = llm_future.result()
return {
"regex_results": regex_results,
"llm_results": llm_results,
"selected_templates": [spec.code for spec in specs],
"source_evidence": [{"source_file": name, "char_count": len(text)} for name, text in texts.items()],
}
def save_field_extract_result(batch: ApplicationFormFillBatch, payload: dict[str, Any]) -> ApplicationFormFillArtifact:
target_dir = ensure_batch_subdir(batch, "exports")
path = target_dir / "field_extract_result.json"
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return create_artifact_for_file(
batch,
path=path,
artifact_type=ApplicationFormFillArtifact.ArtifactType.FIELD_EXTRACT_RESULT,
file_format=ApplicationFormFillArtifact.FileFormat.JSON,
name="field_extract_result",
metadata={"artifact": "field_extract_result"},
created_by_node="field_extract",
)
def detect_source_role(file_name: str, text: str = "") -> str:
target = f"{file_name}\n{text[:200]}"
if "说明书" in target:
return "说明书"
if "产品技术要求" in target:
return "产品技术要求"
if "注册检验" in target or "检测报告" in target:
return "注册检验报告"
if "性能研究" in target:
return "性能研究资料"
if "申请表" in target:
return "申请表"
return "其他注册资料"
def _field_defs(specs: list[TemplateSpec]) -> list[dict[str, str]]:
fields: list[dict[str, str]] = []
for spec in specs:
for field in spec.fields:
key = str(field.get("key") or "")
label = str(field.get("label") or "")
if key and label:
fields.append({"key": key, "label": label})
return fields
def _extract_label_value(text: str, label: str, labels: list[str]) -> tuple[str, str]:
escaped_labels = "|".join(re.escape(item) for item in labels if item != label)
stop_pattern = rf"(?=\n\s*(?:{escaped_labels})\s*[:])" if escaped_labels else r"(?=\Z)"
pattern = re.compile(rf"{re.escape(label)}\s*[:]\s*(.+?)(?:{stop_pattern}|\Z)", re.S)
match = pattern.search(text or "")
if not match:
return "", ""
raw = match.group(1).strip()
value = re.sub(r"\n{2,}.*\Z", "", raw, flags=re.S).strip()
value = "\n".join(line.strip() for line in value.splitlines() if line.strip())
evidence = f"{label}{value}"[:300]
return value, evidence
def _prompt_text() -> str:
path = Path(__file__).resolve().parents[1] / "prompts" / "field_extract.md"
return path.read_text(encoding="utf-8")
def _build_llm_user_prompt(texts: dict[str, str], specs: list[TemplateSpec]) -> str:
fields = [{"key": field["key"], "label": field["label"]} for field in _field_defs(specs)]
documents = [{"source_file": name, "text": text[:4000]} for name, text in texts.items()]
return json.dumps({"fields": fields, "documents": documents}, ensure_ascii=False)
def _parse_json_object(raw: str) -> dict[str, Any]:
text = (raw or "").strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.lower().startswith("json"):
text = text[4:].strip()
start = text.find("{")
end = text.rfind("}")
if start == -1 or end == -1 or end < start:
raise json.JSONDecodeError("未找到 JSON 对象", text, 0)
return json.loads(text[start : end + 1])
def _float_confidence(value, *, default: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default

View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import re
from typing import Any
from review_agent.application_form_fill.schemas import MergedField
SOURCE_PRIORITY = {
"说明书": 1,
"产品技术要求": 2,
"注册检验报告": 3,
"检测报告": 3,
"性能研究资料": 4,
"其他注册资料": 5,
}
def normalize_field_value(value: str) -> str:
return re.sub(r"\s+", "", str(value or "")).strip().lower()
def rank_source(source_role: str, source_file: str = "") -> int:
target = f"{source_role}\n{source_file}"
for keyword, rank in SOURCE_PRIORITY.items():
if keyword in target:
return rank
return 9
def merge_fields(regex_results: dict[str, Any], llm_results: dict[str, Any]) -> tuple[dict[str, MergedField], list[dict]]:
grouped: dict[str, list[dict[str, Any]]] = {}
for item in list(regex_results.get("fields") or []) + list(llm_results.get("fields") or []):
key = str(item.get("key") or "")
value = str(item.get("value") or "").strip()
if not key or not value:
continue
grouped.setdefault(key, []).append(item)
merged: dict[str, MergedField] = {}
conflicts: list[dict] = []
for key, candidates in grouped.items():
selected = sorted(
candidates,
key=lambda item: (
rank_source(str(item.get("source_role") or ""), str(item.get("source_file") or "")),
-float(item.get("confidence") or 0),
),
)[0]
distinct = _distinct_values(candidates)
has_conflict = len(distinct) > 1
conflict_values = [
{
"value": item.get("value"),
"source_file": item.get("source_file", ""),
"source_role": item.get("source_role", ""),
"evidence": item.get("evidence", ""),
}
for item in candidates
if normalize_field_value(str(item.get("value") or "")) != normalize_field_value(str(selected.get("value") or ""))
]
merged_field = MergedField(
key=key,
label=str(selected.get("label") or key),
value=str(selected.get("value") or ""),
source_file=str(selected.get("source_file") or ""),
evidence=str(selected.get("evidence") or ""),
confidence=float(selected.get("confidence") or 0),
has_conflict=has_conflict,
conflict_values=conflict_values,
)
merged[key] = merged_field
if has_conflict:
conflicts.append(
{
"field_key": key,
"field_label": merged_field.label,
"selected_value": merged_field.value,
"selected_source": merged_field.source_file,
"conflict_values": conflict_values,
"handling": "说明书优先,模板内黄底红字高亮" if rank_source(merged_field.source_file, merged_field.source_file) == 1 else "按来源优先级采用最高优先级字段",
}
)
return merged, conflicts
def _distinct_values(candidates: list[dict[str, Any]]) -> set[str]:
return {normalize_field_value(str(item.get("value") or "")) for item in candidates if item.get("value")}

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from django.utils import timezone
from review_agent.models import (
ApplicationFormFillBatch,
ApplicationFormFillNotificationRecord,
ExportedSummaryFile,
)
def notify_completion(
batch: ApplicationFormFillBatch,
exports: list[ExportedSummaryFile],
*,
fail: bool = False,
) -> ApplicationFormFillNotificationRecord:
export_ids = [export.pk for export in exports]
message_summary = (
f"自动填表批次 {batch.batch_no} 已完成,"
f"模板 {', '.join(batch.selected_templates or []) or '未识别'}"
f"冲突字段 {len(batch.conflict_summary or [])} 个。"
)
if fail:
return ApplicationFormFillNotificationRecord.objects.create(
batch=batch,
recipient=batch.user,
channel=ApplicationFormFillNotificationRecord.Channel.MOCK,
template_codes=batch.selected_templates,
export_ids=export_ids,
message_summary=message_summary,
send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED,
retry_count=1,
error_message="mock notification failed",
)
return ApplicationFormFillNotificationRecord.objects.create(
batch=batch,
recipient=batch.user,
channel=ApplicationFormFillNotificationRecord.Channel.MOCK,
template_codes=batch.selected_templates,
export_ids=export_ids,
message_summary=message_summary,
send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS,
sent_at=timezone.now(),
)

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile
def build_assistant_summary(batch: ApplicationFormFillBatch, exports: list[ExportedSummaryFile]) -> str:
word_exports = [export for export in exports if export.export_type == ExportedSummaryFile.ExportType.WORD]
trace_exports = [
export
for export in exports
if export.export_type in {ExportedSummaryFile.ExportType.EXCEL, ExportedSummaryFile.ExportType.JSON}
]
lines = ["已生成申报模板自动填表文件。", "", "| 文件 | Word | PDF |", "| --- | --- | --- |"]
if word_exports:
for export in word_exports:
lines.append(f"| {export.file_name} | [下载](/api/review-agent/file-summary/exports/{export.pk}/download/) | 待增强 |")
else:
lines.append("| 自动填表结果 | 未生成 | 待增强 |")
conflicts = batch.conflict_summary or []
if conflicts:
lines.extend(["", "| 冲突字段 | 采用值 | 冲突来源 | 处理 |", "| --- | --- | --- | --- |"])
for item in conflicts:
conflict_sources = "".join(
f"{value.get('source_file', '')}{value.get('value', '')}" for value in item.get("conflict_values", [])
)
lines.append(
f"| {item.get('field_label', item.get('field_key', ''))} | {item.get('selected_value', '')} | {conflict_sources or '-'} | {item.get('handling', '')} |"
)
if trace_exports:
lines.append("")
for export in trace_exports:
lines.append(f"[下载{export.file_name}](/api/review-agent/file-summary/exports/{export.pk}/download/)")
return "\n".join(lines).strip()

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import hashlib
from pathlib import Path
from typing import Any
import yaml
from django.conf import settings
DEFAULT_CONFIG_PATH = (
Path(settings.BASE_DIR)
/ "review_agent"
/ "application_form_fill"
/ "templates"
/ "application_form_templates_v1.yaml"
)
SUPPORTED_TARGET_TYPES = {"table_row", "placeholder"}
SUPPORTED_FILE_FORMATS = {"doc", "docx"}
def load_template_config(path: str | Path | None = None) -> dict[str, Any]:
config_path = Path(path) if path else DEFAULT_CONFIG_PATH
with config_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
return payload
def compute_config_hash(path: str | Path | None = None) -> str:
config_path = Path(path) if path else DEFAULT_CONFIG_PATH
digest = hashlib.sha256()
with config_path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def validate_template_config(config: dict[str, Any], *, base_dir: str | Path | None = None) -> list[str]:
errors: list[str] = []
root = Path(base_dir) if base_dir else Path(settings.BASE_DIR)
version = config.get("version")
if not version:
errors.append("模板配置缺少 version。")
source_dir_value = config.get("source_dir")
source_dir = root / source_dir_value if source_dir_value else None
if not source_dir_value:
errors.append("模板配置缺少 source_dir。")
elif not source_dir.exists():
errors.append(f"模板 source_dir 不存在:{source_dir_value}")
templates = config.get("templates")
if not isinstance(templates, list) or not templates:
errors.append("模板配置必须包含非空 templates 列表。")
return errors
seen_codes: set[str] = set()
for index, template in enumerate(templates, start=1):
if not isinstance(template, dict):
errors.append(f"{index} 个模板配置必须是对象。")
continue
code = str(template.get("code") or "").strip()
if not code:
errors.append(f"{index} 个模板缺少 code。")
elif code in seen_codes:
errors.append(f"模板 code 重复:{code}")
seen_codes.add(code)
file_format = str(template.get("file_format") or "").strip().lower()
if file_format not in SUPPORTED_FILE_FORMATS:
errors.append(f"模板 {code or index} 的 file_format 不支持:{file_format or ''}")
source_file = str(template.get("source_file") or "").strip()
if not source_file:
errors.append(f"模板 {code or index} 缺少 source_file。")
elif source_dir and source_dir.exists() and not (source_dir / source_file).exists():
errors.append(f"模板 {code or index} 的 source_file 不存在:{source_file}")
fields = template.get("fields") or []
if not isinstance(fields, list):
errors.append(f"模板 {code or index} 的 fields 必须是列表。")
continue
for field_index, field in enumerate(fields, start=1):
target = field.get("target") if isinstance(field, dict) else None
target_type = str((target or {}).get("type") or "").strip()
if target_type not in SUPPORTED_TARGET_TYPES:
errors.append(
f"模板 {code or index}{field_index} 个字段 target.type 不支持:{target_type or ''}"
)
return errors
def template_specs(config: dict[str, Any]) -> list[dict[str, Any]]:
return list(config.get("templates") or [])

View File

@@ -0,0 +1,57 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import Any
from django.conf import settings
from review_agent.application_form_fill.schemas import TemplateSpec
from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir
from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch
class TemplateUnavailableError(Exception):
pass
def resolve_source_template(spec: TemplateSpec, config: dict[str, Any]) -> Path:
source_dir = Path(settings.BASE_DIR) / str(config.get("source_dir") or "")
working_template = getattr(spec, "working_template", "") or ""
if spec.file_format == "doc" and working_template:
candidate = source_dir / working_template
else:
candidate = source_dir / spec.source_file
if not candidate.exists():
raise TemplateUnavailableError(f"模板文件不存在:{spec.source_file}")
if spec.file_format == "doc" and candidate.suffix.lower() == ".doc":
raise TemplateUnavailableError(f"模板 {spec.code} 为 .doc当前阶段需预转换为 .docx 后使用。")
return candidate
def copy_template_to_batch(
spec: TemplateSpec,
batch: ApplicationFormFillBatch,
config: dict[str, Any],
) -> ApplicationFormFillArtifact:
source = resolve_source_template(spec, config)
target_dir = ensure_batch_subdir(batch, "templates")
target = target_dir / f"{spec.code}.source{source.suffix.lower()}"
shutil.copy2(source, target)
_ensure_under(target, Path(batch.work_dir))
return create_artifact_for_file(
batch,
path=target,
artifact_type=ApplicationFormFillArtifact.ArtifactType.TEMPLATE_COPY,
file_format=source.suffix.lower().lstrip(".") or spec.file_format,
name=spec.name,
metadata={"template_code": spec.code, "source_file": spec.source_file},
created_by_node="template_copy",
)
def _ensure_under(path: Path, root: Path) -> None:
resolved_path = path.resolve()
resolved_root = root.resolve()
if resolved_path != resolved_root and resolved_root not in resolved_path.parents:
raise ValueError(f"模板复制目标不在批次工作目录内:{path}")

View File

@@ -0,0 +1,158 @@
from __future__ import annotations
from typing import Any
from review_agent.application_form_fill.constants import (
TEMPLATE_CHANGE_REGISTRATION,
TEMPLATE_ESSENTIAL_PRINCIPLES,
TEMPLATE_REGISTRATION_CERTIFICATE,
)
from review_agent.application_form_fill.schemas import TemplateSpec
from review_agent.models import ApplicationFormFillBatch
ALL_TEMPLATE_CODES = [
TEMPLATE_REGISTRATION_CERTIFICATE,
TEMPLATE_CHANGE_REGISTRATION,
TEMPLATE_ESSENTIAL_PRINCIPLES,
]
def parse_requested_templates(message: str) -> list[str]:
normalized = (message or "").lower()
if any(keyword in normalized for keyword in ["全部模板", "所有模板", "全套模板", "全部表格", "所有表格"]):
return ALL_TEMPLATE_CODES.copy()
requested: list[str] = []
if "注册证" in normalized and "变更注册" not in normalized and "变更 注册" not in normalized:
requested.append(TEMPLATE_REGISTRATION_CERTIFICATE)
if any(keyword in normalized for keyword in ["变更注册", "变更 注册", "变更备案", "备案文件"]):
requested.append(TEMPLATE_CHANGE_REGISTRATION)
if any(keyword in normalized for keyword in ["安全和性能基本原则", "基本原则清单", "原则清单"]):
requested.append(TEMPLATE_ESSENTIAL_PRINCIPLES)
return _dedupe(requested)
def detect_registration_type(
*,
batch: ApplicationFormFillBatch | None = None,
message: str = "",
file_candidates: dict[str, Any] | None = None,
) -> tuple[str, str]:
user_value = _registration_type_from_text(message)
if user_value:
return user_value, ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE
regulatory_value = _registration_type_from_regulatory_batch(batch)
if regulatory_value:
return regulatory_value, ApplicationFormFillBatch.RegistrationTypeSource.REGULATORY_BATCH
file_value = _registration_type_from_candidates(file_candidates or {})
if file_value:
return file_value, ApplicationFormFillBatch.RegistrationTypeSource.FILE_EXTRACT
return "unknown", ApplicationFormFillBatch.RegistrationTypeSource.UNKNOWN
def select_templates(
config: dict[str, Any],
requested_templates: list[str],
registration_type: str,
) -> tuple[list[TemplateSpec], list[dict[str, str]]]:
template_map = {item.get("code"): item for item in config.get("templates") or []}
risk_notes: list[dict[str, str]] = []
if requested_templates:
selected_codes = _dedupe(requested_templates)
elif registration_type in {"变更注册", "备案"}:
selected_codes = [TEMPLATE_CHANGE_REGISTRATION, TEMPLATE_ESSENTIAL_PRINCIPLES]
else:
selected_codes = [TEMPLATE_REGISTRATION_CERTIFICATE, TEMPLATE_ESSENTIAL_PRINCIPLES]
specs: list[TemplateSpec] = []
for code in selected_codes:
raw = template_map.get(code)
if not raw:
risk_notes.append({"type": "unknown_template", "message": f"模板不存在:{code}"})
continue
spec = _to_template_spec(raw)
if requested_templates and not _template_applies(spec, registration_type):
risk_notes.append(
{
"type": "template_registration_mismatch",
"message": f"用户指定模板 {spec.name} 与注册类型 {registration_type or 'unknown'} 可能不匹配,仍按指定生成。",
}
)
specs.append(spec)
return specs, risk_notes
def _to_template_spec(raw: dict[str, Any]) -> TemplateSpec:
return TemplateSpec(
code=str(raw.get("code") or ""),
name=str(raw.get("name") or ""),
source_file=str(raw.get("source_file") or ""),
output_label=str(raw.get("output_label") or raw.get("name") or ""),
applies_when=dict(raw.get("applies_when") or {}),
file_format=str(raw.get("file_format") or ""),
fields=list(raw.get("fields") or []),
checklist_items=list(raw.get("checklist_items") or []),
)
def _template_applies(spec: TemplateSpec, registration_type: str) -> bool:
allowed = spec.applies_when.get("registration_type") or []
if not allowed:
return True
return registration_type in allowed or (registration_type == "unknown" and "unknown" in allowed)
def _registration_type_from_text(message: str) -> str:
normalized = (message or "").lower()
if any(keyword in normalized for keyword in ["首次注册", "初次注册", "新注册"]):
return "首次注册"
if "变更注册" in normalized:
return "变更注册"
if "备案" in normalized:
return "备案"
return ""
def _registration_type_from_regulatory_batch(batch: ApplicationFormFillBatch | None) -> str:
if not batch or not batch.source_regulatory_batch_id:
return ""
condition_json = batch.source_regulatory_batch.condition_json or {}
confirmed = condition_json.get("confirmed_conditions") or {}
candidates = condition_json.get("candidates") or {}
for payload in [confirmed, condition_json, candidates.get("registration_type") or {}]:
if isinstance(payload, dict):
value = payload.get("registration_type") or payload.get("suggested") or payload.get("value")
normalized = _normalize_registration_type(value)
if normalized:
return normalized
return ""
def _registration_type_from_candidates(candidates: dict[str, Any]) -> str:
value = candidates.get("registration_type") or candidates.get("suggested")
if isinstance(value, dict):
value = value.get("value") or value.get("suggested")
return _normalize_registration_type(value)
def _normalize_registration_type(value: Any) -> str:
text = str(value or "")
if "首次" in text or "初次" in text:
return "首次注册"
if "变更" in text:
return "变更注册"
if "备案" in text:
return "备案"
return ""
def _dedupe(values: list[str]) -> list[str]:
result: list[str] = []
for value in values:
if value and value not in result:
result.append(value)
return result

View File

@@ -0,0 +1,145 @@
from __future__ import annotations
import json
from dataclasses import asdict
from pathlib import Path
from typing import Any
from openpyxl import Workbook
from review_agent.application_form_fill.constants import WORKFLOW_TYPE
from review_agent.application_form_fill.schemas import MergedField, TemplateSpec
from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir
from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile
def build_traceability_workbook(
batch: ApplicationFormFillBatch,
merged_fields: dict[str, MergedField],
conflicts: list[dict[str, Any]],
specs: list[TemplateSpec],
generation_results: list[dict[str, Any]] | None = None,
) -> Workbook:
workbook = Workbook()
field_sheet = workbook.active
field_sheet.title = "字段追溯"
field_sheet.append(["模板", "字段", "填入值", "来源文件", "证据", "冲突状态"])
template_names = {field.get("key"): spec.output_label for spec in specs for field in spec.fields}
for key, field in merged_fields.items():
field_sheet.append(
[
template_names.get(key, ""),
field.label,
field.value,
field.source_file,
field.evidence,
"冲突" if field.has_conflict else "一致",
]
)
conflict_sheet = workbook.create_sheet("冲突字段")
conflict_sheet.append(["字段", "采用值", "冲突值", "冲突来源", "处理方式"])
for conflict in conflicts:
conflict_values = conflict.get("conflict_values") or []
if not conflict_values:
conflict_sheet.append(
[
conflict.get("field_label", ""),
conflict.get("selected_value", ""),
"",
"",
conflict.get("handling", ""),
]
)
continue
for value in conflict_values:
conflict_sheet.append(
[
conflict.get("field_label", ""),
conflict.get("selected_value", ""),
value.get("value", ""),
value.get("source_file", ""),
conflict.get("handling", ""),
]
)
low_confidence_sheet = workbook.create_sheet("低置信度条目")
low_confidence_sheet.append(["字段", "填入值", "置信度", "来源文件"])
for field in merged_fields.values():
if field.confidence < 0.6:
low_confidence_sheet.append([field.label, field.value, field.confidence, field.source_file])
result_sheet = workbook.create_sheet("生成结果")
result_sheet.append(["模板", "Word状态", "PDF状态", "错误说明"])
for result in generation_results or []:
result_sheet.append(
[
result.get("template_label", ""),
result.get("word_status", ""),
result.get("pdf_status", "待增强"),
result.get("error_message", ""),
]
)
if not generation_results:
for spec in specs:
result_sheet.append([spec.output_label, "待生成", "待增强", ""])
return workbook
def save_traceability_exports(
batch: ApplicationFormFillBatch,
merged_fields: dict[str, MergedField],
conflicts: list[dict[str, Any]],
specs: list[TemplateSpec],
generation_results: list[dict[str, Any]] | None = None,
) -> list[ExportedSummaryFile]:
target_dir = ensure_batch_subdir(batch, "exports")
workbook = build_traceability_workbook(batch, merged_fields, conflicts, specs, generation_results)
excel_path = target_dir / f"{batch.batch_no}-字段来源追溯清单.xlsx"
workbook.save(excel_path)
create_artifact_for_file(
batch,
path=excel_path,
artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY,
file_format=ApplicationFormFillArtifact.FileFormat.EXCEL,
name="字段来源追溯清单",
metadata={"conflict_count": len(conflicts)},
created_by_node="trace_export",
)
excel_export = ExportedSummaryFile.objects.create(
batch=batch.source_summary_batch,
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
export_category="traceability",
export_type=ExportedSummaryFile.ExportType.EXCEL,
file_name=excel_path.name,
storage_path=str(excel_path),
)
json_path = target_dir / "merged_fields.json"
payload = {
"batch_no": batch.batch_no,
"merged_fields": {key: asdict(value) for key, value in merged_fields.items()},
"conflicts": conflicts,
"generation_results": generation_results or [],
}
json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
create_artifact_for_file(
batch,
path=json_path,
artifact_type=ApplicationFormFillArtifact.ArtifactType.MERGED_FIELDS,
file_format=ApplicationFormFillArtifact.FileFormat.JSON,
name="merged_fields",
metadata={"conflict_count": len(conflicts)},
created_by_node="trace_export",
)
json_export = ExportedSummaryFile.objects.create(
batch=batch.source_summary_batch,
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
export_category="traceability",
export_type=ExportedSummaryFile.ExportType.JSON,
file_name=json_path.name,
storage_path=str(json_path),
)
return [excel_export, json_export]

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import re
from pathlib import Path
from docx import Document
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.shared import RGBColor
from review_agent.application_form_fill.constants import WORKFLOW_TYPE
from review_agent.application_form_fill.schemas import MergedField, TemplateSpec
from review_agent.application_form_fill.storage import create_artifact_for_file, ensure_batch_subdir
from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch, ExportedSummaryFile
def fill_template(
template_path: str | Path,
output_path: str | Path,
spec: TemplateSpec,
fields: dict[str, MergedField],
conflicts: list[dict] | None = None,
) -> Path:
document = Document(str(template_path))
conflict_keys = {item.get("field_key") for item in conflicts or []}
for field_config in spec.fields:
target = field_config.get("target") or {}
if target.get("type") != "table_row":
continue
key = field_config.get("key")
field = fields.get(key)
if not field:
continue
fill_table_row(
document,
str(target.get("row_label") or field_config.get("label") or ""),
field.value,
conflict=key in conflict_keys or field.has_conflict,
)
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
document.save(str(output))
return output
def fill_table_row(document: Document, row_label: str, value: str, *, conflict: bool = False) -> bool:
normalized_label = _normalize_label(row_label)
for table in document.tables:
for row in table.rows:
if len(row.cells) < 2:
continue
if _normalize_label(row.cells[0].text) != normalized_label:
continue
target = row.cells[1]
target.text = ""
paragraph = target.paragraphs[0]
run = paragraph.add_run(value)
if conflict:
run.font.color.rgb = RGBColor(0xFF, 0x00, 0x00)
apply_cell_shading(target, "FFFF00")
return True
return False
def apply_cell_shading(cell, fill: str) -> None:
tc_pr = cell._tc.get_or_add_tcPr()
shading = tc_pr.find(qn("w:shd"))
if shading is None:
shading = OxmlElement("w:shd")
tc_pr.append(shading)
shading.set(qn("w:fill"), fill)
def create_word_export(
batch: ApplicationFormFillBatch,
spec: TemplateSpec,
template_path: str | Path,
fields: dict[str, MergedField],
conflicts: list[dict] | None = None,
) -> ExportedSummaryFile:
target_dir = ensure_batch_subdir(batch, "filled")
product_name = _safe_filename(batch.product_name or fields.get("product_name", MergedField("product_name", "产品名称", "", "", "", 0)).value or "未识别产品")
output_path = target_dir / f"{batch.batch_no}-{product_name}-{_safe_filename(spec.output_label)}.docx"
fill_template(template_path, output_path, spec, fields, conflicts)
create_artifact_for_file(
batch,
path=output_path,
artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE,
file_format=ApplicationFormFillArtifact.FileFormat.DOCX,
name=spec.output_label,
metadata={"template_code": spec.code, "conflict_count": len(conflicts or [])},
created_by_node="word_fill",
)
return ExportedSummaryFile.objects.create(
batch=batch.source_summary_batch,
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name=output_path.name,
storage_path=str(output_path),
)
def _normalize_label(value: str) -> str:
return re.sub(r"\s+", "", value or "").replace("", "").replace(":", "")
def _safe_filename(value: str) -> str:
text = re.sub(r'[\\/:*?"<>|]+', "_", value or "")
return text.strip()[:80] or "output"

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
import hashlib
from pathlib import Path
from django.conf import settings
from review_agent.models import ApplicationFormFillArtifact, ApplicationFormFillBatch
def build_batch_work_dir(batch: ApplicationFormFillBatch | None = None, *, batch_no: str = "") -> Path:
if batch:
return Path(settings.MEDIA_ROOT) / "application_form_fill" / str(batch.user_id) / str(batch.conversation_id) / batch.batch_no
return Path(settings.MEDIA_ROOT) / "application_form_fill" / batch_no
def compute_file_sha256(path: str | Path) -> str:
file_path = Path(path)
digest = hashlib.sha256()
with file_path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def ensure_batch_subdir(batch: ApplicationFormFillBatch, name: str) -> Path:
root = Path(batch.work_dir) if batch.work_dir else build_batch_work_dir(batch)
target = root / Path(name).name
target.mkdir(parents=True, exist_ok=True)
return target
def create_artifact_for_file(
batch: ApplicationFormFillBatch,
*,
path: str | Path,
artifact_type: str,
file_format: str,
name: str = "",
metadata: dict | None = None,
created_by_node: str = "",
) -> ApplicationFormFillArtifact:
file_path = Path(path)
return ApplicationFormFillArtifact.objects.create(
batch=batch,
artifact_type=artifact_type,
file_format=file_format,
name=name or file_path.stem,
file_name=file_path.name,
storage_path=str(file_path),
file_size=file_path.stat().st_size if file_path.exists() else 0,
content_hash=compute_file_sha256(file_path) if file_path.exists() else "",
metadata=metadata or {},
created_by_node=created_by_node,
)

View File

@@ -0,0 +1,112 @@
version: application_form_templates_v1
source_dir: docs/0.原始材料/关于公布体外诊断试剂注册申报资料要求和批准证明文件格式的公告
templates:
- code: registration_certificate
name: 中华人民共和国医疗器械注册证(体外诊断试剂)(格式)
source_file: 中华人民共和国医疗器械注册证(体外诊断试剂)(格式).docx
output_label: 注册证格式
applies_when:
registration_type:
- 首次注册
- unknown
file_format: docx
fields:
- key: applicant_name
label: 注册人名称
target:
type: table_row
row_label: 注册人名称
source_roles:
- 申请表
- 说明书
- 企业信息
- key: applicant_address
label: 注册人住所
target:
type: table_row
row_label: 注册人住所
source_roles:
- 申请表
- 企业信息
- key: manufacturer_address
label: 生产地址
target:
type: table_row
row_label: 生产地址
source_roles:
- 申请表
- 质量管理体系文件
- key: product_name
label: 产品名称
target:
type: table_row
row_label: 产品名称
source_roles:
- 说明书
- 产品技术要求
- 注册检验报告
- key: package_specification
label: 包装规格
target:
type: table_row
row_label: 包装规格
source_roles:
- 说明书
- 产品技术要求
- key: main_components
label: 主要组成成分
target:
type: table_row
row_label: 主要组成成分
source_roles:
- 说明书
- 产品技术要求
- key: intended_use
label: 预期用途
target:
type: table_row
row_label: 预期用途
source_roles:
- 说明书
- 临床评价资料
- 产品技术要求
- key: storage_condition_and_validity
label: 产品储存条件及有效期
target:
type: table_row
row_label: 产品储存条件及有效期
source_roles:
- 说明书
- 产品技术要求
- 稳定性研究资料
- key: attachments
label: 附件
target:
type: table_row
row_label: 附件
source_roles:
- 注册申报资料
- 说明书
- code: change_registration
name: 中华人民共和国医疗器械变更注册(备案)文件(体外诊断试剂)(格式)
source_file: 中华人民共和国医疗器械变更注册(备案)文件(体外诊断试剂)(格式).doc
output_label: 变更注册备案文件
applies_when:
registration_type:
- 变更注册
- 备案
file_format: doc
fields: []
- code: essential_principles
name: 体外诊断试剂安全和性能基本原则清单
source_file: 体外诊断试剂安全和性能基本原则清单.doc
output_label: 安全和性能基本原则清单
applies_when:
registration_type:
- 首次注册
- 变更注册
- 备案
- unknown
file_format: doc
fields: []
checklist_items: []

View File

@@ -0,0 +1,127 @@
import json
from django.contrib.auth.decorators import login_required
from django.conf import settings
from django.http import Http404, JsonResponse
from django.views.decorators.http import require_http_methods
from review_agent.application_form_fill.workflow import (
create_application_form_fill_batch,
find_latest_successful_summary_batch,
start_application_form_fill_workflow,
)
from review_agent.models import ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileSummaryBatch, WorkflowNodeRun
@require_http_methods(["GET"])
def health(request):
return JsonResponse({"workflow_type": "application_form_fill", "status": "available"})
@login_required
@require_http_methods(["POST"])
def start(request):
try:
payload = json.loads(request.body.decode("utf-8") or "{}")
except json.JSONDecodeError:
return JsonResponse({"error": "JSON 格式错误。"}, status=400)
conversation = Conversation.objects.filter(pk=payload.get("conversation_id"), user=request.user).first()
if not conversation:
raise Http404("对话不存在。")
summary_batch = None
if payload.get("file_summary_batch_id"):
summary_batch = FileSummaryBatch.objects.filter(
pk=payload.get("file_summary_batch_id"),
conversation=conversation,
user=request.user,
status=FileSummaryBatch.Status.SUCCESS,
).first()
if summary_batch is None:
summary_batch = find_latest_successful_summary_batch(conversation)
if summary_batch is None:
return JsonResponse({"error": "请先上传资料并完成文件汇总。"}, status=400)
batch = create_application_form_fill_batch(
conversation=conversation,
user=request.user,
source_summary_batch=summary_batch,
requested_templates=payload.get("template_codes") or [],
output_types=payload.get("output_types") or None,
)
start_application_form_fill_workflow(batch, async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True))
return JsonResponse(
{
"batch_id": batch.pk,
"workflow_type": "application_form_fill",
"status": batch.status,
"selected_templates": batch.selected_templates,
}
)
@login_required
@require_http_methods(["GET"])
def batch_status(request, batch_id: int):
batch = ApplicationFormFillBatch.objects.filter(
pk=batch_id,
conversation__user=request.user,
is_deleted=False,
).first()
if not batch:
raise Http404("填表批次不存在。")
exports = ExportedSummaryFile.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
).order_by("id")
return JsonResponse(
{
"batch": {
"id": batch.pk,
"workflow_type": "application_form_fill",
"batch_no": batch.batch_no,
"status": batch.status,
"product_name": batch.product_name,
"selected_templates": batch.selected_templates,
"conflict_count": len(batch.conflict_summary or []),
"risk_summary_text": _risk_summary_text(batch),
"error_message": batch.error_message,
},
"nodes": [
{
"node_code": node.node_code,
"node_name": node.node_name,
"status": node.status,
"progress": node.progress,
"message": node.message,
}
for node in WorkflowNodeRun.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
).order_by("id")
],
"conflicts": batch.conflict_summary or [],
"exports": [
{
"id": export.pk,
"export_type": export.export_type,
"export_category": export.export_category,
"file_name": export.file_name,
"download_url": f"/api/review-agent/file-summary/exports/{export.pk}/download/",
}
for export in exports
],
}
)
def _risk_summary_text(batch: ApplicationFormFillBatch) -> str:
parts = []
if batch.selected_templates:
parts.append("模板 " + "".join(batch.selected_templates))
if batch.conflict_summary:
parts.append(f"冲突字段 {len(batch.conflict_summary)}")
if batch.risk_notes:
parts.append(f"提示 {len(batch.risk_notes)}")
return " · ".join(parts)

View File

@@ -0,0 +1,328 @@
from __future__ import annotations
import logging
from threading import Thread
from uuid import uuid4
from django.conf import settings
from django.db import transaction
from django.utils import timezone
from review_agent.application_form_fill.constants import DEFAULT_OUTPUT_TYPES, FORM_FILL_NODE_DEFINITIONS, WORKFLOW_TYPE
from review_agent.application_form_fill.events import record_event
from review_agent.application_form_fill.services.field_extract import (
collect_document_texts,
run_parallel_extract,
save_field_extract_result,
)
from review_agent.application_form_fill.services.field_merge import merge_fields
from review_agent.application_form_fill.services.notifier import notify_completion
from review_agent.application_form_fill.services.summary import build_assistant_summary
from review_agent.application_form_fill.services.template_config import (
compute_config_hash,
load_template_config,
validate_template_config,
)
from review_agent.application_form_fill.services.template_repository import (
TemplateUnavailableError,
copy_template_to_batch,
)
from review_agent.application_form_fill.services.template_select import (
detect_registration_type,
parse_requested_templates,
select_templates,
)
from review_agent.application_form_fill.services.traceability_export import save_traceability_exports
from review_agent.application_form_fill.services.word_fill import create_word_export
from review_agent.application_form_fill.schemas import MergedField, TemplateSpec
from review_agent.application_form_fill.storage import build_batch_work_dir
from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, Message, WorkflowNodeRun
logger = logging.getLogger("review_agent.application_form_fill.workflow")
def build_batch_no() -> str:
return f"AFF-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}"
def find_latest_successful_summary_batch(conversation: Conversation) -> FileSummaryBatch | None:
return (
FileSummaryBatch.objects.filter(
conversation=conversation,
status=FileSummaryBatch.Status.SUCCESS,
)
.order_by("-finished_at", "-created_at", "-id")
.first()
)
@transaction.atomic
def create_application_form_fill_batch(
*,
conversation: Conversation,
user,
source_summary_batch: FileSummaryBatch,
trigger_message: Message | None = None,
requested_templates: list[str] | None = None,
output_types: list[str] | None = None,
) -> ApplicationFormFillBatch:
batch_no = build_batch_no()
work_dir = build_batch_work_dir(batch_no=batch_no)
work_dir.mkdir(parents=True, exist_ok=True)
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
trigger_message=trigger_message,
source_summary_batch=source_summary_batch,
batch_no=batch_no,
requested_templates=requested_templates or [],
output_types=output_types or DEFAULT_OUTPUT_TYPES,
work_dir=str(work_dir),
)
for code, name, group in FORM_FILL_NODE_DEFINITIONS:
WorkflowNodeRun.objects.create(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
node_group=group,
node_code=code,
node_name=name,
)
record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no})
return batch
class FormFillWorkflowExecutor:
"""Runs the auto-fill workflow skeleton; later stages fill node bodies."""
def __init__(self, batch: ApplicationFormFillBatch):
self.batch = batch
self.template_config: dict = {}
self.selected_templates: list[TemplateSpec] = []
self.template_paths: dict[str, str] = {}
self.document_texts: dict[str, str] = {}
self.extract_payload: dict = {}
self.merged_fields: dict[str, MergedField] = {}
self.conflicts: list[dict] = []
self.exports = []
self.generation_results: list[dict] = []
self.non_blocking_errors: list[str] = []
def run(self) -> None:
logger.info("自动填表工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk)
self.batch.status = ApplicationFormFillBatch.Status.RUNNING
self.batch.started_at = timezone.now()
self.batch.save(update_fields=["status", "started_at"])
record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk})
try:
for node in self._nodes():
if node.status in {WorkflowNodeRun.Status.SUCCESS, WorkflowNodeRun.Status.SKIPPED}:
continue
self._run_node(node)
except Exception as exc:
logger.exception("Application form fill workflow failed", extra={"batch_id": self.batch.pk})
self.batch.status = ApplicationFormFillBatch.Status.FAILED
self.batch.error_message = str(exc)
self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "error_message", "finished_at"])
record_event(self.batch, "workflow_failed", {"message": str(exc)})
return
self.batch.refresh_from_db()
if self.batch.status != ApplicationFormFillBatch.Status.PARTIAL_SUCCESS:
self.batch.status = ApplicationFormFillBatch.Status.SUCCESS
self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "finished_at"])
record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk})
logger.info("自动填表工作流完成 batch_no=%s", self.batch.batch_no)
def _nodes(self):
return WorkflowNodeRun.objects.filter(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=self.batch.pk,
).order_by("id")
def _run_node(self, node: WorkflowNodeRun) -> None:
node.status = WorkflowNodeRun.Status.RUNNING
node.progress = 10
node.started_at = timezone.now()
node.message = f"{node.node_name}处理中"
node.save(update_fields=["status", "progress", "started_at", "message"])
record_event(
self.batch,
"node_progress",
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
)
if node.node_code == "pdf_convert":
self._append_risk_note(
{
"type": "pdf_pending",
"message": "PDF 转换为后续增强项,本次优先生成 Word。",
}
)
node.status = WorkflowNodeRun.Status.SKIPPED
node.progress = 100
node.finished_at = timezone.now()
node.message = "PDF 转换为后续增强项,本次跳过"
node.save(update_fields=["status", "progress", "finished_at", "message"])
record_event(
self.batch,
"node_progress",
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
)
return
self._execute_node(node)
node.status = WorkflowNodeRun.Status.SUCCESS
node.progress = 100
node.finished_at = timezone.now()
node.message = f"{node.node_name}完成"
node.save(update_fields=["status", "progress", "finished_at", "message"])
record_event(
self.batch,
"node_progress",
{"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message},
)
def _execute_node(self, node: WorkflowNodeRun) -> None:
if node.node_code == "prepare":
if self.batch.source_summary_batch.status != FileSummaryBatch.Status.SUCCESS:
raise ValueError("自动填表需要成功的文件汇总批次。")
return
if node.node_code == "template_select":
self.template_config = load_template_config()
errors = validate_template_config(self.template_config)
if errors:
raise ValueError("".join(errors))
requested = parse_requested_templates(self.batch.trigger_message.content if self.batch.trigger_message else "")
registration_type, source = detect_registration_type(batch=self.batch, message=self.batch.trigger_message.content if self.batch.trigger_message else "")
specs, risk_notes = select_templates(self.template_config, requested, registration_type)
if not specs:
raise ValueError("未选择到可用申报模板。")
self.selected_templates = specs
self.batch.requested_templates = requested
self.batch.selected_templates = [spec.code for spec in specs]
self.batch.registration_type = registration_type
self.batch.registration_type_source = source
self.batch.template_config_version = str(self.template_config.get("version") or "")
self.batch.template_config_hash = compute_config_hash()
self.batch.risk_notes = list(self.batch.risk_notes or []) + risk_notes
self.batch.save(
update_fields=[
"requested_templates",
"selected_templates",
"registration_type",
"registration_type_source",
"template_config_version",
"template_config_hash",
"risk_notes",
]
)
return
if node.node_code == "template_copy":
for spec in self.selected_templates:
try:
artifact = copy_template_to_batch(spec, self.batch, self.template_config)
self.template_paths[spec.code] = artifact.storage_path
except TemplateUnavailableError as exc:
self.non_blocking_errors.append(str(exc))
self._append_risk_note({"type": "template_unavailable", "message": str(exc), "template_code": spec.code})
if not self.template_paths:
raise ValueError("没有可用的 Word 模板副本。")
return
if node.node_code == "field_extract":
self.document_texts = collect_document_texts(self.batch.source_summary_batch)
self.extract_payload = run_parallel_extract(self.document_texts, self.selected_templates)
save_field_extract_result(self.batch, self.extract_payload)
return
if node.node_code == "conflict_merge":
self.merged_fields, self.conflicts = merge_fields(
self.extract_payload.get("regex_results") or {},
self.extract_payload.get("llm_results") or {},
)
product = self.merged_fields.get("product_name")
if product and product.value:
self.batch.product_name = product.value
self.batch.conflict_summary = self.conflicts
self.batch.save(update_fields=["product_name", "conflict_summary"])
return
if node.node_code == "word_fill":
for spec in self.selected_templates:
template_path = self.template_paths.get(spec.code)
if not template_path:
self.generation_results.append(
{
"template_code": spec.code,
"template_label": spec.output_label,
"word_status": "failed",
"pdf_status": "待增强",
"error_message": "模板不可用",
}
)
continue
export = create_word_export(self.batch, spec, template_path, self.merged_fields, self.conflicts)
self.exports.append(export)
self.generation_results.append(
{
"template_code": spec.code,
"template_label": spec.output_label,
"word_status": "success",
"pdf_status": "待增强",
"error_message": "",
}
)
if not any(item["word_status"] == "success" for item in self.generation_results):
raise ValueError("所有目标 Word 模板均生成失败。")
return
if node.node_code == "trace_export":
self.exports.extend(
save_traceability_exports(
self.batch,
self.merged_fields,
self.conflicts,
self.selected_templates,
self.generation_results,
)
)
return
if node.node_code == "output_export":
Message.objects.create(
conversation=self.batch.conversation,
role=Message.Role.ASSISTANT,
content=build_assistant_summary(self.batch, self.exports),
)
return
if node.node_code == "notify":
notification = notify_completion(
self.batch,
self.exports,
fail=getattr(settings, "APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL", False),
)
if notification.send_status == notification.SendStatus.FAILED:
self.non_blocking_errors.append(notification.error_message or "通知失败")
return
if node.node_code == "completed":
self._mark_final_status()
def _mark_final_status(self) -> None:
failed_word = any(item.get("word_status") == "failed" for item in self.generation_results)
if self.non_blocking_errors or failed_word:
self.batch.status = ApplicationFormFillBatch.Status.PARTIAL_SUCCESS
else:
self.batch.status = ApplicationFormFillBatch.Status.SUCCESS
self.batch.save(update_fields=["status"])
def _append_risk_note(self, note: dict) -> None:
self.batch.risk_notes = list(self.batch.risk_notes or []) + [note]
self.batch.save(update_fields=["risk_notes"])
def start_application_form_fill_workflow(batch: ApplicationFormFillBatch, *, async_run: bool = True) -> None:
executor = FormFillWorkflowExecutor(batch)
if not async_run:
executor.run()
return
Thread(target=executor.run, daemon=True).start()

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from django.http import FileResponse, Http404, JsonResponse
from django.views.decorators.http import require_http_methods
from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment, Message
from review_agent.models import ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileAttachment, Message
from review_agent.models import FileSummaryBatch, WorkflowEvent
from .events import serialize_event
from .paths import resolve_storage_path
@@ -271,10 +271,7 @@ def batch_events(request, batch_id: int):
@require_http_methods(["GET"])
@login_required
def export_download(request, export_id: int):
exported = ExportedSummaryFile.objects.filter(
pk=export_id,
batch__user=request.user,
).first()
exported = _export_for_user(request.user, export_id)
if not exported:
raise Http404("导出文件不存在。")
path = Path(exported.storage_path)
@@ -288,6 +285,8 @@ def export_download(request, export_id: int):
ExportedSummaryFile.ExportType.MARKDOWN: "text/markdown; charset=utf-8",
ExportedSummaryFile.ExportType.EXCEL: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
ExportedSummaryFile.ExportType.JSON: "application/json; charset=utf-8",
ExportedSummaryFile.ExportType.WORD: "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
ExportedSummaryFile.ExportType.PDF: "application/pdf",
}
content_type = content_types.get(exported.export_type, "application/octet-stream")
logger.info(
@@ -305,3 +304,21 @@ def export_download(request, export_id: int):
filename=exported.file_name,
content_type=content_type,
)
def _export_for_user(user, export_id: int) -> ExportedSummaryFile | None:
exported = ExportedSummaryFile.objects.filter(pk=export_id).first()
if not exported:
return None
if exported.workflow_type == "application_form_fill":
if not exported.workflow_batch_id:
return None
allowed = ApplicationFormFillBatch.objects.filter(
pk=exported.workflow_batch_id,
conversation__user=user,
is_deleted=False,
).exists()
return exported if allowed else None
if exported.batch.user_id != user.pk:
return None
return exported

View File

@@ -0,0 +1,353 @@
# Generated by Django 5.2.14 on 2026-06-07 10:19
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("review_agent", "0005_alter_regulatoryissue_status"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AlterField(
model_name="exportedsummaryfile",
name="export_type",
field=models.CharField(
choices=[
("markdown", "Markdown"),
("excel", "Excel"),
("json", "JSON"),
("word", "Word"),
("pdf", "PDF"),
],
max_length=20,
),
),
migrations.CreateModel(
name="ApplicationFormFillBatch",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("batch_no", models.CharField(max_length=64, unique=True)),
(
"status",
models.CharField(
choices=[
("pending", "待执行"),
("running", "执行中"),
("waiting_user", "等待用户"),
("success", "成功"),
("partial_success", "部分成功"),
("failed", "失败"),
("cancelled", "已取消"),
],
default="pending",
max_length=30,
),
),
("requested_templates", models.JSONField(blank=True, default=list)),
("selected_templates", models.JSONField(blank=True, default=list)),
("output_types", models.JSONField(blank=True, default=list)),
(
"registration_type",
models.CharField(blank=True, default="", max_length=80),
),
(
"registration_type_source",
models.CharField(
choices=[
("user_message", "用户话语"),
("regulatory_batch", "法规核查批次"),
("file_extract", "文件抽取"),
("unknown", "未知"),
],
default="unknown",
max_length=40,
),
),
(
"product_name",
models.CharField(blank=True, default="", max_length=200),
),
("conflict_summary", models.JSONField(blank=True, default=list)),
("risk_notes", models.JSONField(blank=True, default=list)),
(
"template_config_version",
models.CharField(blank=True, default="", max_length=80),
),
(
"template_config_hash",
models.CharField(blank=True, default="", max_length=128),
),
("work_dir", models.CharField(blank=True, default="", max_length=500)),
("error_message", models.TextField(blank=True, default="")),
("created_at", models.DateTimeField(auto_now_add=True)),
("started_at", models.DateTimeField(blank=True, null=True)),
("finished_at", models.DateTimeField(blank=True, null=True)),
("archived_at", models.DateTimeField(blank=True, null=True)),
("is_deleted", models.BooleanField(default=False)),
(
"conversation",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="application_form_fill_batches",
to="review_agent.conversation",
),
),
(
"source_regulatory_batch",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="application_form_fill_batches",
to="review_agent.regulatoryreviewbatch",
),
),
(
"source_summary_batch",
models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name="application_form_fill_batches",
to="review_agent.filesummarybatch",
),
),
(
"trigger_message",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="triggered_application_form_fill_batches",
to="review_agent.message",
),
),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="review_application_form_fill_batches",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"db_table": "ra_application_form_fill_batch",
"ordering": ["-created_at", "-id"],
},
),
migrations.CreateModel(
name="ApplicationFormFillArtifact",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"artifact_type",
models.CharField(
choices=[
("template_copy", "模板副本"),
("field_extract_result", "字段抽取结果"),
("merged_fields", "字段合并结果"),
("traceability", "追溯清单"),
("filled_template", "已填模板"),
("notification_record", "通知记录"),
],
max_length=60,
),
),
(
"file_format",
models.CharField(
choices=[
("json", "JSON"),
("excel", "Excel"),
("docx", "DOCX"),
("pdf", "PDF"),
("markdown", "Markdown"),
],
max_length=20,
),
),
("name", models.CharField(max_length=160)),
("file_name", models.CharField(max_length=255)),
("storage_path", models.CharField(max_length=500)),
("file_size", models.BigIntegerField(default=0)),
(
"content_hash",
models.CharField(blank=True, default="", max_length=128),
),
("metadata", models.JSONField(blank=True, default=dict)),
(
"created_by_node",
models.CharField(blank=True, default="", max_length=60),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("is_deleted", models.BooleanField(default=False)),
(
"batch",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="artifacts",
to="review_agent.applicationformfillbatch",
),
),
],
options={
"db_table": "ra_application_form_fill_artifact",
"ordering": ["-created_at", "-id"],
},
),
migrations.CreateModel(
name="ApplicationFormFillNotificationRecord",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"channel",
models.CharField(
choices=[
("feishu_cli", "飞书 CLI"),
("feishu_api", "飞书 API"),
("mock", "模拟"),
],
default="mock",
max_length=30,
),
),
("template_codes", models.JSONField(blank=True, default=list)),
("export_ids", models.JSONField(blank=True, default=list)),
("message_summary", models.TextField(blank=True, default="")),
(
"send_status",
models.CharField(
choices=[
("pending", "待发送"),
("success", "成功"),
("failed", "失败"),
],
default="pending",
max_length=20,
),
),
("retry_count", models.PositiveIntegerField(default=0)),
(
"external_message_id",
models.CharField(blank=True, default="", max_length=120),
),
("error_message", models.TextField(blank=True, default="")),
("sent_at", models.DateTimeField(blank=True, null=True)),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("is_deleted", models.BooleanField(default=False)),
(
"batch",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="notifications",
to="review_agent.applicationformfillbatch",
),
),
(
"recipient",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="application_form_fill_notifications",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"db_table": "ra_application_form_fill_notification_record",
"ordering": ["-created_at", "-id"],
},
),
migrations.AddIndex(
model_name="applicationformfillbatch",
index=models.Index(
fields=["conversation", "status"], name="idx_ra_aff_batch_conv_status"
),
),
migrations.AddIndex(
model_name="applicationformfillbatch",
index=models.Index(
fields=["source_summary_batch"], name="idx_ra_aff_batch_summary"
),
),
migrations.AddIndex(
model_name="applicationformfillbatch",
index=models.Index(
fields=["source_regulatory_batch"], name="idx_ra_aff_batch_regulatory"
),
),
migrations.AddIndex(
model_name="applicationformfillbatch",
index=models.Index(
fields=["user", "created_at"], name="idx_ra_aff_batch_user_created"
),
),
migrations.AddIndex(
model_name="applicationformfillbatch",
index=models.Index(fields=["created_at"], name="idx_ra_aff_batch_created"),
),
migrations.AddIndex(
model_name="applicationformfillartifact",
index=models.Index(
fields=["batch", "artifact_type"], name="idx_ra_aff_artifact_batch_type"
),
),
migrations.AddIndex(
model_name="applicationformfillartifact",
index=models.Index(
fields=["file_format"], name="idx_ra_aff_artifact_format"
),
),
migrations.AddIndex(
model_name="applicationformfillartifact",
index=models.Index(
fields=["created_at"], name="idx_ra_aff_artifact_created"
),
),
migrations.AddIndex(
model_name="applicationformfillnotificationrecord",
index=models.Index(
fields=["batch", "created_at"], name="idx_ra_aff_notify_batch"
),
),
migrations.AddIndex(
model_name="applicationformfillnotificationrecord",
index=models.Index(
fields=["recipient", "send_status"], name="idx_ra_aff_notify_recipient"
),
),
migrations.AddIndex(
model_name="applicationformfillnotificationrecord",
index=models.Index(
fields=["send_status", "retry_count"], name="idx_ra_aff_notify_status"
),
),
]

View File

@@ -334,6 +334,8 @@ class ExportedSummaryFile(models.Model):
MARKDOWN = "markdown", "Markdown"
EXCEL = "excel", "Excel"
JSON = "json", "JSON"
WORD = "word", "Word"
PDF = "pdf", "PDF"
class Status(models.TextChoices):
SUCCESS = "success", "成功"
@@ -397,6 +399,92 @@ class RegulatoryRuleVersion(models.Model):
return self.code
class ApplicationFormFillBatch(models.Model):
"""Tracks one application-form auto-fill workflow run."""
class Status(models.TextChoices):
PENDING = "pending", "待执行"
RUNNING = "running", "执行中"
WAITING_USER = "waiting_user", "等待用户"
SUCCESS = "success", "成功"
PARTIAL_SUCCESS = "partial_success", "部分成功"
FAILED = "failed", "失败"
CANCELLED = "cancelled", "已取消"
class RegistrationTypeSource(models.TextChoices):
USER_MESSAGE = "user_message", "用户话语"
REGULATORY_BATCH = "regulatory_batch", "法规核查批次"
FILE_EXTRACT = "file_extract", "文件抽取"
UNKNOWN = "unknown", "未知"
conversation = models.ForeignKey(
Conversation,
on_delete=models.CASCADE,
related_name="application_form_fill_batches",
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="review_application_form_fill_batches",
)
trigger_message = models.ForeignKey(
Message,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="triggered_application_form_fill_batches",
)
source_summary_batch = models.ForeignKey(
FileSummaryBatch,
on_delete=models.PROTECT,
related_name="application_form_fill_batches",
)
source_regulatory_batch = models.ForeignKey(
"RegulatoryReviewBatch",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="application_form_fill_batches",
)
batch_no = models.CharField(max_length=64, unique=True)
status = models.CharField(max_length=30, choices=Status.choices, default=Status.PENDING)
requested_templates = models.JSONField(default=list, blank=True)
selected_templates = models.JSONField(default=list, blank=True)
output_types = models.JSONField(default=list, blank=True)
registration_type = models.CharField(max_length=80, blank=True, default="")
registration_type_source = models.CharField(
max_length=40,
choices=RegistrationTypeSource.choices,
default=RegistrationTypeSource.UNKNOWN,
)
product_name = models.CharField(max_length=200, blank=True, default="")
conflict_summary = models.JSONField(default=list, blank=True)
risk_notes = models.JSONField(default=list, blank=True)
template_config_version = models.CharField(max_length=80, blank=True, default="")
template_config_hash = models.CharField(max_length=128, blank=True, default="")
work_dir = models.CharField(max_length=500, blank=True, default="")
error_message = models.TextField(blank=True, default="")
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
finished_at = models.DateTimeField(null=True, blank=True)
archived_at = models.DateTimeField(null=True, blank=True)
is_deleted = models.BooleanField(default=False)
class Meta:
db_table = "ra_application_form_fill_batch"
ordering = ["-created_at", "-id"]
indexes = [
models.Index(fields=["conversation", "status"], name="idx_ra_aff_batch_conv_status"),
models.Index(fields=["source_summary_batch"], name="idx_ra_aff_batch_summary"),
models.Index(fields=["source_regulatory_batch"], name="idx_ra_aff_batch_regulatory"),
models.Index(fields=["user", "created_at"], name="idx_ra_aff_batch_user_created"),
models.Index(fields=["created_at"], name="idx_ra_aff_batch_created"),
]
def __str__(self) -> str:
return self.batch_no
class RegulatoryReviewBatch(models.Model):
"""Tracks one NMPA regulatory review workflow run."""
@@ -571,3 +659,98 @@ class RegulatoryNotificationRecord(models.Model):
indexes = [
models.Index(fields=["batch", "status"], name="idx_ra_rr_notify_status"),
]
class ApplicationFormFillArtifact(models.Model):
"""Stores auto-fill intermediate files and generated artifacts."""
class ArtifactType(models.TextChoices):
TEMPLATE_COPY = "template_copy", "模板副本"
FIELD_EXTRACT_RESULT = "field_extract_result", "字段抽取结果"
MERGED_FIELDS = "merged_fields", "字段合并结果"
TRACEABILITY = "traceability", "追溯清单"
FILLED_TEMPLATE = "filled_template", "已填模板"
NOTIFICATION_RECORD = "notification_record", "通知记录"
class FileFormat(models.TextChoices):
JSON = "json", "JSON"
EXCEL = "excel", "Excel"
DOCX = "docx", "DOCX"
PDF = "pdf", "PDF"
MARKDOWN = "markdown", "Markdown"
batch = models.ForeignKey(
ApplicationFormFillBatch,
on_delete=models.CASCADE,
related_name="artifacts",
)
artifact_type = models.CharField(max_length=60, choices=ArtifactType.choices)
file_format = models.CharField(max_length=20, choices=FileFormat.choices)
name = models.CharField(max_length=160)
file_name = models.CharField(max_length=255)
storage_path = models.CharField(max_length=500)
file_size = models.BigIntegerField(default=0)
content_hash = models.CharField(max_length=128, blank=True, default="")
metadata = models.JSONField(default=dict, blank=True)
created_by_node = models.CharField(max_length=60, blank=True, default="")
created_at = models.DateTimeField(auto_now_add=True)
is_deleted = models.BooleanField(default=False)
class Meta:
db_table = "ra_application_form_fill_artifact"
ordering = ["-created_at", "-id"]
indexes = [
models.Index(fields=["batch", "artifact_type"], name="idx_ra_aff_artifact_batch_type"),
models.Index(fields=["file_format"], name="idx_ra_aff_artifact_format"),
models.Index(fields=["created_at"], name="idx_ra_aff_artifact_created"),
]
class ApplicationFormFillNotificationRecord(models.Model):
"""Stores mock/Feishu notification records for application-form auto-fill."""
class Channel(models.TextChoices):
FEISHU_CLI = "feishu_cli", "飞书 CLI"
FEISHU_API = "feishu_api", "飞书 API"
MOCK = "mock", "模拟"
class SendStatus(models.TextChoices):
PENDING = "pending", "待发送"
SUCCESS = "success", "成功"
FAILED = "failed", "失败"
batch = models.ForeignKey(
ApplicationFormFillBatch,
on_delete=models.CASCADE,
related_name="notifications",
)
recipient = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="application_form_fill_notifications",
)
channel = models.CharField(max_length=30, choices=Channel.choices, default=Channel.MOCK)
template_codes = models.JSONField(default=list, blank=True)
export_ids = models.JSONField(default=list, blank=True)
message_summary = models.TextField(blank=True, default="")
send_status = models.CharField(
max_length=20,
choices=SendStatus.choices,
default=SendStatus.PENDING,
)
retry_count = models.PositiveIntegerField(default=0)
external_message_id = models.CharField(max_length=120, blank=True, default="")
error_message = models.TextField(blank=True, default="")
sent_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_deleted = models.BooleanField(default=False)
class Meta:
db_table = "ra_application_form_fill_notification_record"
ordering = ["-created_at", "-id"]
indexes = [
models.Index(fields=["batch", "created_at"], name="idx_ra_aff_notify_batch"),
models.Index(fields=["recipient", "send_status"], name="idx_ra_aff_notify_recipient"),
models.Index(fields=["send_status", "retry_count"], name="idx_ra_aff_notify_status"),
]

View File

@@ -11,6 +11,11 @@ from .file_summary.skills.attachment_reader import AttachmentReaderSkill
from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow
from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply
from .models import Conversation, FileAttachment, FileSummaryBatch, Message
from .application_form_fill.workflow import (
create_application_form_fill_batch,
find_latest_successful_summary_batch as find_latest_successful_form_fill_summary_batch,
start_application_form_fill_workflow,
)
from .regulatory_review.workflow import (
create_regulatory_review_batch,
find_latest_successful_summary_batch,
@@ -224,6 +229,85 @@ def stream_message(conversation: Conversation, content: str):
)
return
if route.starts_application_form_fill:
source_summary_batch = find_latest_successful_form_fill_summary_batch(conversation)
if not source_summary_batch:
if not _has_active_attachments(conversation):
reply_content = "请先在当前对话右侧上传需要填表的产品资料或压缩包,我会先自动汇总再继续生成申报模板。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
summary_batch = create_file_summary_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
)
yield sse_event(
"workflow_started",
{
"workflow_type": "file_summary",
"batch_id": summary_batch.pk,
"batch_no": summary_batch.batch_no,
},
)
start_file_summary_workflow(summary_batch, async_run=False)
summary_batch.refresh_from_db()
if summary_batch.status != FileSummaryBatch.Status.SUCCESS:
reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动申报文件自动填表。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
source_summary_batch = summary_batch
reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续自动填表。\n"
else:
reply_prefix = ""
batch = create_application_form_fill_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
source_summary_batch=source_summary_batch,
)
start_application_form_fill_workflow(
batch,
async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True),
)
reply_content = f"{reply_prefix}已启动申报文件自动填表工作流,批次号:{batch.batch_no}"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"workflow_started",
{
"workflow_type": "application_form_fill",
"batch_id": batch.pk,
"batch_no": batch.batch_no,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.starts_regulatory_review:
source_summary_batch = find_latest_successful_summary_batch(conversation)
if not source_summary_batch:

View File

@@ -8,6 +8,7 @@ from .file_summary.workflow_trigger import (
evaluate_attachment_reader_trigger,
evaluate_file_summary_trigger,
)
from .application_form_fill.constants import FORM_FILL_TRIGGER_KEYWORDS, WORKFLOW_TYPE as FORM_FILL_WORKFLOW_TYPE
from .llm import LLMConfigurationError, LLMRequestError, generate_completion
from .models import Conversation, FileAttachment
@@ -16,6 +17,7 @@ logger = logging.getLogger(__name__)
ROUTE_ACTIONS = {"normal_chat", "attachment_reader", "file_summary"}
ROUTE_ACTIONS.add("regulatory_review")
ROUTE_ACTIONS.add(FORM_FILL_WORKFLOW_TYPE)
@dataclass(frozen=True)
@@ -39,6 +41,10 @@ class SkillRoute:
def starts_regulatory_review(self) -> bool:
return self.action == "regulatory_review"
@property
def starts_application_form_fill(self) -> bool:
return self.action == FORM_FILL_WORKFLOW_TYPE
@property
def is_normal_chat(self) -> bool:
return self.action == "normal_chat"
@@ -105,7 +111,7 @@ def _route_with_llm(
return SkillRoute(
action=action,
skill_name="attachment_reader" if action == "attachment_reader" else "",
workflow_type=action if action in {"file_summary", "regulatory_review"} else "",
workflow_type=action if action in {"file_summary", "regulatory_review", FORM_FILL_WORKFLOW_TYPE} else "",
confidence=_float_or_zero(payload.get("confidence")),
reason=str(payload.get("reason") or ""),
source="llm",
@@ -113,6 +119,15 @@ def _route_with_llm(
def _route_with_rules(conversation: Conversation, content: str) -> SkillRoute:
if _matches_application_form_fill(content):
return SkillRoute(
action=FORM_FILL_WORKFLOW_TYPE,
workflow_type=FORM_FILL_WORKFLOW_TYPE,
confidence=0.7,
reason="命中申报文件自动填表关键词。",
source="rule_fallback",
)
if _matches_regulatory_review(content):
return SkillRoute(
action="regulatory_review",
@@ -162,10 +177,11 @@ def _router_system_prompt() -> str:
return (
"你是审核智能体的工具路由器,只判断是否需要调用工具,不直接回答用户。"
"你必须只输出 JSON 对象,不要输出 Markdown。"
"可选 actionnormal_chat、attachment_reader、file_summary、regulatory_review。"
"可选 actionnormal_chat、attachment_reader、file_summary、regulatory_review、application_form_fill"
"attachment_reader 用于用户要求阅读、提取、分析、总结、查看上传附件内容。"
"file_summary 用于用户要求自动汇总文件目录、页数、清单或生成目录页数报告。"
"regulatory_review 用于用户要求法规核查、NMPA核查、完整性核查、章节一致性核查、风险预警或整改建议。"
"application_form_fill 用于用户要求填注册证、生成申报模板、填写对应表格、安全和性能基本原则清单或自动填表。"
"normal_chat 用于不需要读取附件或执行工作流的一般问答。"
"输出字段action、confidence、reason。"
)
@@ -217,3 +233,8 @@ def _matches_regulatory_review(content: str) -> bool:
"一致性核查",
]
return any(keyword in normalized for keyword in keywords)
def _matches_application_form_fill(content: str) -> bool:
normalized = content.lower()
return any(keyword.lower() in normalized for keyword in FORM_FILL_TRIGGER_KEYWORDS)

View File

@@ -16,6 +16,10 @@ from .regulatory_review.views import (
review_issues as regulatory_review_review_issues,
start_full_review as regulatory_review_start_full_review,
)
from .application_form_fill.views import (
batch_status as application_form_fill_batch_status,
start as application_form_fill_start,
)
urlpatterns = [
@@ -84,4 +88,14 @@ urlpatterns = [
regulatory_review_review_issues,
name="regulatory_review_review_issues",
),
path(
"api/review-agent/application-form-fill/start/",
application_form_fill_start,
name="application_form_fill_start",
),
path(
"api/review-agent/application-form-fill/<int:batch_id>/status/",
application_form_fill_batch_status,
name="application_form_fill_batch_status",
),
]

View File

@@ -11,7 +11,7 @@ from .services import (
send_message,
stream_message,
)
from .models import Conversation, FileAttachment, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun
from .models import ApplicationFormFillBatch, Conversation, FileAttachment, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun
from .regulatory_review.services.info_extract import ensure_regulatory_condition_candidates
@@ -155,6 +155,25 @@ def build_workflow_cards(conversation: Conversation) -> list[dict[str, object]]:
),
}
)
form_fill_batches = ApplicationFormFillBatch.objects.filter(conversation=conversation, is_deleted=False)
for batch in form_fill_batches:
cards.append(
{
"id": batch.pk,
"workflow_type": "application_form_fill",
"batch_no": batch.batch_no,
"status": batch.status,
"error_message": batch.error_message,
"risk_label": _format_form_fill_label(batch),
"created_at": batch.created_at,
"nodes": list(
WorkflowNodeRun.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
).order_by("id")
),
}
)
return sorted(cards, key=lambda item: item["created_at"], reverse=True)[:5]
@@ -187,3 +206,14 @@ def _format_risk_label(risk_summary: dict) -> str:
if count:
parts.append(f"{label} {count}")
return " · ".join(parts)
def _format_form_fill_label(batch: ApplicationFormFillBatch) -> str:
parts = []
if batch.selected_templates:
parts.append("模板 " + "".join(batch.selected_templates))
if batch.conflict_summary:
parts.append(f"冲突字段 {len(batch.conflict_summary)}")
if batch.risk_notes:
parts.append(f"提示 {len(batch.risk_notes)}")
return " · ".join(parts)

View File

@@ -464,8 +464,12 @@
}
function statusUrlForWorkflow(workflow_type, batchId) {
var attributeName =
workflow_type === "regulatory_review" ? "data-regulatory-status-url-template" : "data-status-url-template";
var attributeName = "data-status-url-template";
if (workflow_type === "regulatory_review") {
attributeName = "data-regulatory-status-url-template";
} else if (workflow_type === "application_form_fill") {
attributeName = "data-application-form-fill-status-url-template";
}
return templateUrl(attributeName, "__batch_id__", batchId);
}
@@ -832,7 +836,7 @@
}
function isWorkflowTerminalStatus(status) {
return status === "success" || status === "failed";
return status === "success" || status === "partial_success" || status === "failed";
}
function workflowTimerKey(batchId, workflow_type) {

View File

@@ -216,6 +216,7 @@
data-message-url-template="/api/review-agent/conversations/__conversation_id__/messages/"
data-status-url-template="/api/review-agent/file-summary/__batch_id__/status/"
data-regulatory-status-url-template="/api/review-agent/regulatory-review/__batch_id__/status/"
data-application-form-fill-status-url-template="/api/review-agent/application-form-fill/__batch_id__/status/"
data-events-url-template="/api/review-agent/file-summary/__batch_id__/events/"
>
<section class="summary-section upload-section">

View File

@@ -0,0 +1,121 @@
import json
import pytest
from review_agent.application_form_fill.services.field_extract import (
extract_by_llm,
extract_by_rules,
run_parallel_extract,
save_field_extract_result,
)
from review_agent.application_form_fill.services.template_config import load_template_config
from review_agent.application_form_fill.services.template_select import select_templates
from review_agent.models import (
ApplicationFormFillArtifact,
ApplicationFormFillBatch,
Conversation,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def _registration_specs():
config = load_template_config()
specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册")
return specs
def test_rule_extracts_registration_certificate_fields():
texts = {
"产品说明书.txt": "\n".join(
[
"产品名称:甲胎蛋白检测试剂盒",
"包装规格20人份/盒",
"预期用途:用于体外定量检测人血清中甲胎蛋白含量",
"产品储存条件及有效期2-8℃保存有效期12个月",
]
)
}
result = extract_by_rules(texts, _registration_specs())
values = {field["key"]: field for field in result["fields"]}
assert values["product_name"]["value"] == "甲胎蛋白检测试剂盒"
assert values["intended_use"]["source_role"] == "说明书"
assert "2-8℃保存" in values["storage_condition_and_validity"]["value"]
assert values["package_specification"]["extractor"] == "rule"
def test_llm_extract_parses_structured_json(monkeypatch):
monkeypatch.setattr(
"review_agent.application_form_fill.services.field_extract.generate_completion",
lambda messages, temperature=0.0: json.dumps(
{
"fields": [
{
"key": "product_name",
"label": "产品名称",
"value": "甲胎蛋白检测试剂盒",
"source_file": "说明书.txt",
"source_role": "说明书",
"evidence": "产品名称:甲胎蛋白检测试剂盒",
"confidence": 0.9,
}
],
"checklist_items": [],
},
ensure_ascii=False,
),
)
result = extract_by_llm({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs())
assert result["fields"][0]["extractor"] == "llm"
assert result["fields"][0]["value"] == "甲胎蛋白检测试剂盒"
def test_llm_extract_failure_returns_empty_result(monkeypatch):
monkeypatch.setattr(
"review_agent.application_form_fill.services.field_extract.generate_completion",
lambda messages, temperature=0.0: (_ for _ in ()).throw(TimeoutError("timeout")),
)
result = extract_by_llm({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs())
assert result["fields"] == []
assert "timeout" in result["error_message"]
def test_parallel_extract_preserves_rule_result_when_llm_fails(monkeypatch):
monkeypatch.setattr(
"review_agent.application_form_fill.services.field_extract.generate_completion",
lambda messages, temperature=0.0: (_ for _ in ()).throw(TimeoutError("timeout")),
)
payload = run_parallel_extract({"说明书.txt": "产品名称:甲胎蛋白检测试剂盒"}, _registration_specs())
assert payload["regex_results"]["fields"]
assert payload["llm_results"]["fields"] == []
assert payload["selected_templates"] == ["registration_certificate"]
def test_save_field_extract_result_creates_json_artifact(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-FIELD")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-FIELD",
work_dir=str(tmp_path / "aff" / "AFF-FIELD"),
)
artifact = save_field_extract_result(batch, {"regex_results": {"fields": []}, "llm_results": {"fields": []}})
assert artifact.artifact_type == ApplicationFormFillArtifact.ArtifactType.FIELD_EXTRACT_RESULT
assert artifact.file_format == ApplicationFormFillArtifact.FileFormat.JSON
assert artifact.content_hash

View File

@@ -0,0 +1,79 @@
import pytest
from review_agent.application_form_fill.services.field_merge import merge_fields, normalize_field_value, rank_source
def test_normalize_field_value_removes_whitespace():
assert normalize_field_value(" 2-8℃ 保存 \n 有效期12个月 ") == "2-8℃保存有效期12个月"
def test_rank_source_prefers_instructions():
assert rank_source("说明书") < rank_source("产品技术要求")
def test_merge_fields_prefers_instructions_and_marks_conflict():
regex_results = {
"fields": [
{
"key": "storage_condition_and_validity",
"label": "产品储存条件及有效期",
"value": "2-8℃保存有效期12个月",
"source_file": "说明书.txt",
"source_role": "说明书",
"evidence": "产品储存条件及有效期2-8℃保存有效期12个月",
"confidence": 0.75,
},
{
"key": "storage_condition_and_validity",
"label": "产品储存条件及有效期",
"value": "-20℃保存",
"source_file": "产品技术要求.txt",
"source_role": "产品技术要求",
"evidence": "产品储存条件及有效期:-20℃保存",
"confidence": 0.8,
},
]
}
merged, conflicts = merge_fields(regex_results, {"fields": []})
field = merged["storage_condition_and_validity"]
assert field.value == "2-8℃保存有效期12个月"
assert field.has_conflict is True
assert conflicts[0]["selected_value"] == "2-8℃保存有效期12个月"
assert conflicts[0]["conflict_values"][0]["value"] == "-20℃保存"
def test_merge_fields_combines_consistent_values_without_conflict():
regex_results = {
"fields": [
{
"key": "product_name",
"label": "产品名称",
"value": "甲胎蛋白检测试剂盒",
"source_file": "说明书.txt",
"source_role": "说明书",
"evidence": "产品名称:甲胎蛋白检测试剂盒",
"confidence": 0.75,
}
]
}
llm_results = {
"fields": [
{
"key": "product_name",
"label": "产品名称",
"value": "甲胎蛋白 检测试剂盒",
"source_file": "产品技术要求.txt",
"source_role": "产品技术要求",
"evidence": "产品名称:甲胎蛋白 检测试剂盒",
"confidence": 0.9,
}
]
}
merged, conflicts = merge_fields(regex_results, llm_results)
assert merged["product_name"].value == "甲胎蛋白检测试剂盒"
assert merged["product_name"].has_conflict is False
assert conflicts == []

View File

@@ -0,0 +1,48 @@
import pytest
from django.urls import reverse
from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, WorkflowNodeRun
pytestmark = pytest.mark.django_db
def test_workspace_renders_application_form_fill_workflow_card(client, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-CARD")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-CARD",
status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS,
selected_templates=["registration_certificate"],
risk_notes=[{"type": "pdf_pending"}],
)
WorkflowNodeRun.objects.create(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
node_group="form_fill",
node_code="word_fill",
node_name="填写 Word",
status=WorkflowNodeRun.Status.SUCCESS,
progress=100,
)
client.force_login(user)
response = client.get(f"{reverse('home')}?conversation={conversation.pk}")
content = response.content.decode("utf-8")
assert "AFF-CARD" in content
assert 'data-workflow-type="application_form_fill"' in content
assert "填写 Word" in content
assert "data-application-form-fill-status-url-template" in content
def test_frontend_selects_application_form_fill_status_url_and_terminal_status():
script = open("static/js/app.js", encoding="utf-8").read()
assert 'workflow_type === "application_form_fill"' in script
assert "data-application-form-fill-status-url-template" in script
assert 'status === "partial_success"' in script

View File

@@ -0,0 +1,109 @@
import pytest
from review_agent.models import (
ApplicationFormFillArtifact,
ApplicationFormFillBatch,
ApplicationFormFillNotificationRecord,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
Message,
RegulatoryReviewBatch,
)
pytestmark = pytest.mark.django_db
def test_application_form_fill_models_store_batch_artifact_notification_and_exports(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="自动填表")
trigger = Message.objects.create(
conversation=conversation,
role=Message.Role.USER,
content="帮我填注册证",
)
summary_batch = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-READY",
status=FileSummaryBatch.Status.SUCCESS,
)
regulatory_batch = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary_batch,
batch_no="RR-AFF-SOURCE",
condition_json={"confirmed": True, "registration_type": "首次注册"},
)
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
trigger_message=trigger,
source_summary_batch=summary_batch,
source_regulatory_batch=regulatory_batch,
batch_no="AFF-20260607153000-abcdef",
requested_templates=["registration_certificate"],
selected_templates=["registration_certificate"],
output_types=["word", "excel", "json"],
registration_type="首次注册",
registration_type_source=ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE,
product_name="甲胎蛋白检测试剂盒",
conflict_summary=[{"field_key": "storage_condition"}],
risk_notes=[{"type": "pdf_pending"}],
template_config_version="application_form_templates_v1",
template_config_hash="hash",
work_dir="media/application_form_fill/1/1/AFF-20260607153000-abcdef",
)
artifact = ApplicationFormFillArtifact.objects.create(
batch=batch,
artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE,
file_format=ApplicationFormFillArtifact.FileFormat.DOCX,
name="注册证格式",
file_name="filled.docx",
storage_path="media/application_form_fill/filled.docx",
file_size=123,
content_hash="sha256",
metadata={"template_code": "registration_certificate"},
created_by_node="word_fill",
)
notification = ApplicationFormFillNotificationRecord.objects.create(
batch=batch,
recipient=user,
channel=ApplicationFormFillNotificationRecord.Channel.MOCK,
template_codes=["registration_certificate"],
export_ids=[1],
message_summary="自动填表完成",
send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED,
retry_count=1,
error_message="mock failed",
)
word_export = ExportedSummaryFile.objects.create(
batch=summary_batch,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path="media/application_form_fill/filled.docx",
)
pdf_export = ExportedSummaryFile.objects.create(
batch=summary_batch,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.PDF,
file_name="filled.pdf",
storage_path="media/application_form_fill/filled.pdf",
)
assert batch.status == ApplicationFormFillBatch.Status.PENDING
assert batch.source_summary_batch == summary_batch
assert batch.source_regulatory_batch == regulatory_batch
assert artifact.content_hash == "sha256"
assert artifact.metadata["template_code"] == "registration_certificate"
assert notification.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED
assert notification.retry_count == 1
assert word_export.export_type == ExportedSummaryFile.ExportType.WORD
assert pdf_export.export_type == ExportedSummaryFile.ExportType.PDF

View File

@@ -0,0 +1,61 @@
import pytest
from review_agent.application_form_fill.services.notifier import notify_completion
from review_agent.models import (
ApplicationFormFillBatch,
ApplicationFormFillNotificationRecord,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def test_notify_completion_records_success(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-NOTIFY",
selected_templates=["registration_certificate"],
)
exported = ExportedSummaryFile.objects.create(
batch=summary,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path="filled.docx",
)
record = notify_completion(batch, [exported])
assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.SUCCESS
assert record.export_ids == [exported.pk]
assert record.template_codes == ["registration_certificate"]
assert record.sent_at is not None
def test_notify_completion_records_failure_without_raising(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY-FAIL")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-NOTIFY-FAIL",
selected_templates=["registration_certificate"],
)
record = notify_completion(batch, [], fail=True)
assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED
assert record.retry_count == 1
assert "mock" in record.error_message

View File

@@ -0,0 +1,97 @@
import copy
import pytest
from review_agent.application_form_fill.services.template_config import (
DEFAULT_CONFIG_PATH,
compute_config_hash,
load_template_config,
validate_template_config,
)
def test_template_config_loads_and_validates_default_yaml(settings):
config = load_template_config()
errors = validate_template_config(config)
assert errors == []
assert config["version"] == "application_form_templates_v1"
registration = next(item for item in config["templates"] if item["code"] == "registration_certificate")
assert registration["file_format"] == "docx"
assert {field["key"] for field in registration["fields"]} >= {
"applicant_name",
"product_name",
"package_specification",
"main_components",
"intended_use",
"storage_condition_and_validity",
"attachments",
}
assert all(field["target"]["type"] == "table_row" for field in registration["fields"])
assert len(compute_config_hash(DEFAULT_CONFIG_PATH)) == 64
def test_template_config_reports_missing_source_dir():
config = load_template_config()
config["source_dir"] = "docs/not-exists"
errors = validate_template_config(config)
assert any("source_dir 不存在" in error for error in errors)
def test_template_config_reports_duplicate_code():
config = load_template_config()
duplicate = copy.deepcopy(config["templates"][0])
config["templates"].append(duplicate)
errors = validate_template_config(config)
assert any("模板 code 重复" in error for error in errors)
def test_template_config_reports_missing_source_file():
config = load_template_config()
config["templates"][0]["source_file"] = "missing.docx"
errors = validate_template_config(config)
assert any("source_file 不存在" in error for error in errors)
def test_template_config_reports_unsupported_target_type():
config = load_template_config()
config["templates"][0]["fields"][0]["target"]["type"] = "content_control"
errors = validate_template_config(config)
assert any("target.type 不支持" in error for error in errors)
def test_template_config_loads_custom_path(tmp_path):
config_path = tmp_path / "templates.yaml"
config_path.write_text(
"""
version: custom
source_dir: .
templates:
- code: custom_template
name: Custom
source_file: source.docx
output_label: Custom
file_format: docx
fields:
- key: product_name
label: 产品名称
target:
type: table_row
row_label: 产品名称
""".strip(),
encoding="utf-8",
)
(tmp_path / "source.docx").write_bytes(b"docx")
config = load_template_config(config_path)
assert validate_template_config(config, base_dir=tmp_path) == []
assert compute_config_hash(config_path)

View File

@@ -0,0 +1,60 @@
import pytest
from review_agent.application_form_fill.services.template_config import load_template_config
from review_agent.application_form_fill.services.template_repository import (
TemplateUnavailableError,
copy_template_to_batch,
resolve_source_template,
)
from review_agent.application_form_fill.services.template_select import select_templates
from review_agent.models import (
ApplicationFormFillArtifact,
ApplicationFormFillBatch,
Conversation,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def test_resolve_source_template_finds_registration_docx():
config = load_template_config()
specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册")
path = resolve_source_template(specs[0], config)
assert path.exists()
assert path.name == "中华人民共和国医疗器械注册证(体外诊断试剂)(格式).docx"
def test_copy_template_to_batch_creates_artifact(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-REPO")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-REPO",
work_dir=str(tmp_path / "aff" / "AFF-REPO"),
)
config = load_template_config()
specs, _risk_notes = select_templates(config, ["registration_certificate"], "首次注册")
artifact = copy_template_to_batch(specs[0], batch, config)
assert artifact.artifact_type == ApplicationFormFillArtifact.ArtifactType.TEMPLATE_COPY
assert artifact.file_format == "docx"
assert artifact.content_hash
assert artifact.metadata["template_code"] == "registration_certificate"
assert artifact.storage_path.startswith(batch.work_dir)
def test_doc_template_without_working_docx_is_unavailable():
config = load_template_config()
specs, _risk_notes = select_templates(config, ["change_registration"], "变更注册")
with pytest.raises(TemplateUnavailableError):
resolve_source_template(specs[0], config)

View File

@@ -0,0 +1,114 @@
import pytest
from review_agent.application_form_fill.services.template_config import load_template_config
from review_agent.application_form_fill.services.template_select import (
detect_registration_type,
parse_requested_templates,
select_templates,
)
from review_agent.models import ApplicationFormFillBatch, Conversation, FileSummaryBatch, RegulatoryReviewBatch
pytestmark = pytest.mark.django_db
@pytest.mark.parametrize(
("message", "expected"),
[
("帮我填注册证", ["registration_certificate"]),
("生成变更注册备案文件", ["change_registration"]),
("生成安全和性能基本原则清单", ["essential_principles"]),
("请生成全部模板", ["registration_certificate", "change_registration", "essential_principles"]),
("普通聊天", []),
],
)
def test_parse_requested_templates(message, expected):
assert parse_requested_templates(message) == expected
def test_detect_registration_type_prefers_user_message(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-SEL")
regulatory = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="RR-SEL",
condition_json={"confirmed_conditions": {"registration_type": "变更注册"}},
)
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
source_regulatory_batch=regulatory,
batch_no="AFF-SEL",
)
value, source = detect_registration_type(batch=batch, message="首次注册资料,请填注册证")
assert value == "首次注册"
assert source == ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE
def test_detect_registration_type_falls_back_to_regulatory_batch_and_file_candidates(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-SEL-2")
regulatory = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="RR-SEL-2",
condition_json={"confirmed_conditions": {"registration_type": "变更注册"}},
)
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
source_regulatory_batch=regulatory,
batch_no="AFF-SEL-2",
)
regulatory_value, regulatory_source = detect_registration_type(batch=batch, message="")
file_value, file_source = detect_registration_type(
message="",
file_candidates={"registration_type": {"suggested": "备案"}},
)
assert (regulatory_value, regulatory_source) == (
"变更注册",
ApplicationFormFillBatch.RegistrationTypeSource.REGULATORY_BATCH,
)
assert (file_value, file_source) == (
"备案",
ApplicationFormFillBatch.RegistrationTypeSource.FILE_EXTRACT,
)
def test_select_default_templates_for_initial_registration():
config = load_template_config()
specs, risk_notes = select_templates(config, [], "首次注册")
assert [spec.code for spec in specs] == ["registration_certificate", "essential_principles"]
assert risk_notes == []
def test_select_default_templates_for_change_registration():
config = load_template_config()
specs, risk_notes = select_templates(config, [], "变更注册")
assert [spec.code for spec in specs] == ["change_registration", "essential_principles"]
assert risk_notes == []
def test_select_user_requested_mismatch_is_allowed_with_risk_note():
config = load_template_config()
specs, risk_notes = select_templates(config, ["change_registration"], "首次注册")
assert [spec.code for spec in specs] == ["change_registration"]
assert risk_notes
assert risk_notes[0]["type"] == "template_registration_mismatch"

View File

@@ -0,0 +1,85 @@
import json
import pytest
from openpyxl import load_workbook
from review_agent.application_form_fill.schemas import MergedField, TemplateSpec
from review_agent.application_form_fill.services.traceability_export import save_traceability_exports
from review_agent.models import (
ApplicationFormFillArtifact,
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def test_traceability_exports_excel_json_and_records(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-TRACE")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-TRACE",
work_dir=str(tmp_path / "aff" / "AFF-TRACE"),
)
spec = TemplateSpec(
code="registration_certificate",
name="注册证格式",
source_file="template.docx",
output_label="注册证格式",
applies_when={},
file_format="docx",
fields=[{"key": "product_name", "label": "产品名称"}],
)
merged_fields = {
"product_name": MergedField(
"product_name",
"产品名称",
"甲胎蛋白检测试剂盒",
"说明书.txt",
"产品名称:甲胎蛋白检测试剂盒",
0.8,
)
}
conflicts = [
{
"field_key": "storage_condition",
"field_label": "储存条件",
"selected_value": "2-8℃",
"conflict_values": [{"value": "-20℃", "source_file": "产品技术要求.txt"}],
"handling": "说明书优先",
}
]
exports = save_traceability_exports(
batch,
merged_fields,
conflicts,
[spec],
[{"template_label": "注册证格式", "word_status": "success", "pdf_status": "待增强"}],
)
assert {export.export_type for export in exports} == {
ExportedSummaryFile.ExportType.EXCEL,
ExportedSummaryFile.ExportType.JSON,
}
excel_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.EXCEL)
workbook = load_workbook(excel_export.storage_path)
assert workbook.sheetnames == ["字段追溯", "冲突字段", "低置信度条目", "生成结果"]
assert workbook["字段追溯"]["B2"].value == "产品名称"
assert workbook["冲突字段"]["C2"].value == "-20℃"
json_export = next(export for export in exports if export.export_type == ExportedSummaryFile.ExportType.JSON)
payload = json.loads(open(json_export.storage_path, encoding="utf-8").read())
assert payload["merged_fields"]["product_name"]["value"] == "甲胎蛋白检测试剂盒"
assert ApplicationFormFillArtifact.objects.filter(
batch=batch,
artifact_type=ApplicationFormFillArtifact.ArtifactType.TRACEABILITY,
).exists()

View File

@@ -0,0 +1,45 @@
import pytest
from review_agent.models import Conversation
from review_agent.skill_router import route_message_intent
pytestmark = pytest.mark.django_db
@pytest.mark.parametrize(
"content",
[
"帮我填注册证",
"给我这个内容对应的表格",
"为我该方案生成申报模板",
"请自动填表并生成表格",
"生成安全和性能基本原则清单",
],
)
def test_rule_router_starts_application_form_fill_for_keywords(monkeypatch, django_user_model, content):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.skill_router._route_with_llm",
lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")),
)
route = route_message_intent(conversation, content)
assert route.action == "application_form_fill"
assert route.workflow_type == "application_form_fill"
assert route.starts_application_form_fill
def test_rule_router_does_not_misroute_normal_chat(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.skill_router._route_with_llm",
lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")),
)
route = route_message_intent(conversation, "你好,解释一下法规背景")
assert route.action == "normal_chat"

View File

@@ -0,0 +1,113 @@
import json
import pytest
from django.urls import reverse
from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS
from review_agent.models import (
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
WorkflowNodeRun,
)
pytestmark = pytest.mark.django_db
def test_application_form_fill_start_requires_conversation_owner(client, monkeypatch, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")
conversation = Conversation.objects.create(user=owner, title="会话")
FileSummaryBatch.objects.create(
conversation=conversation,
user=owner,
batch_no="FS-VIEW",
status=FileSummaryBatch.Status.SUCCESS,
)
monkeypatch.setattr("review_agent.application_form_fill.views.start_application_form_fill_workflow", lambda batch, async_run=True: None)
client.force_login(other)
response = client.post(
reverse("application_form_fill_start"),
data=json.dumps({"conversation_id": conversation.pk}),
content_type="application/json",
)
assert response.status_code == 404
def test_application_form_fill_start_creates_batch(client, monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-VIEW-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
monkeypatch.setattr("review_agent.application_form_fill.views.start_application_form_fill_workflow", lambda batch, async_run=True: None)
client.force_login(user)
response = client.post(
reverse("application_form_fill_start"),
data=json.dumps({"conversation_id": conversation.pk, "template_codes": ["registration_certificate"]}),
content_type="application/json",
)
assert response.status_code == 200
payload = response.json()
assert payload["workflow_type"] == "application_form_fill"
assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()
def test_application_form_fill_status_requires_owner_and_returns_nodes_exports(client, tmp_path, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")
conversation = Conversation.objects.create(user=owner, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-STATUS")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=owner,
source_summary_batch=summary,
batch_no="AFF-STATUS",
status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS,
selected_templates=["registration_certificate"],
conflict_summary=[{"field_key": "product_name"}],
risk_notes=[{"type": "pdf_pending"}],
)
WorkflowNodeRun.objects.create(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
node_group="form_fill",
node_code=FORM_FILL_NODE_DEFINITIONS[0][0],
node_name=FORM_FILL_NODE_DEFINITIONS[0][1],
status=WorkflowNodeRun.Status.SUCCESS,
progress=100,
)
output = tmp_path / "filled.docx"
output.write_bytes(b"word")
exported = ExportedSummaryFile.objects.create(
batch=summary,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path=str(output),
)
client.force_login(other)
denied = client.get(reverse("application_form_fill_batch_status", args=[batch.pk]))
assert denied.status_code == 404
client.force_login(owner)
allowed = client.get(reverse("application_form_fill_batch_status", args=[batch.pk]))
assert allowed.status_code == 200
payload = allowed.json()
assert payload["batch"]["workflow_type"] == "application_form_fill"
assert payload["batch"]["status"] == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS
assert payload["batch"]["conflict_count"] == 1
assert payload["nodes"][0]["node_code"] == "prepare"
assert payload["exports"][0]["id"] == exported.pk

View File

@@ -0,0 +1,121 @@
import zipfile
import pytest
from docx import Document
from review_agent.application_form_fill.schemas import MergedField, TemplateSpec
from review_agent.application_form_fill.services.word_fill import create_word_export, fill_template
from review_agent.models import (
ApplicationFormFillArtifact,
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def _spec():
return TemplateSpec(
code="registration_certificate",
name="注册证格式",
source_file="template.docx",
output_label="注册证格式",
applies_when={"registration_type": ["首次注册"]},
file_format="docx",
fields=[
{"key": "product_name", "label": "产品名称", "target": {"type": "table_row", "row_label": "产品名称"}},
{"key": "intended_use", "label": "预期用途", "target": {"type": "table_row", "row_label": "预期用途"}},
],
)
def _template(path):
document = Document()
table = document.add_table(rows=2, cols=2)
table.rows[0].cells[0].text = "产品名称"
table.rows[1].cells[0].text = "预期用途"
document.save(path)
def test_word_fill_writes_table_rows(tmp_path):
template_path = tmp_path / "template.docx"
output_path = tmp_path / "filled.docx"
_template(template_path)
fill_template(
template_path,
output_path,
_spec(),
{
"product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8),
"intended_use": MergedField("intended_use", "预期用途", "用于体外检测", "说明书.txt", "证据", 0.8),
},
)
document = Document(output_path)
assert document.tables[0].rows[0].cells[1].text == "甲胎蛋白检测试剂盒"
assert document.tables[0].rows[1].cells[1].text == "用于体外检测"
def test_word_fill_highlights_conflict_in_docx_xml(tmp_path):
template_path = tmp_path / "template.docx"
output_path = tmp_path / "filled.docx"
_template(template_path)
fill_template(
template_path,
output_path,
_spec(),
{
"product_name": MergedField(
"product_name",
"产品名称",
"甲胎蛋白检测试剂盒",
"说明书.txt",
"证据",
0.8,
has_conflict=True,
)
},
conflicts=[{"field_key": "product_name"}],
)
with zipfile.ZipFile(output_path) as package:
document_xml = package.read("word/document.xml").decode("utf-8")
assert 'w:fill="FFFF00"' in document_xml
assert 'w:color w:val="FF0000"' in document_xml
def test_create_word_export_records_artifact_and_export(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-WORD")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-WORD",
product_name="甲胎蛋白检测试剂盒",
work_dir=str(tmp_path / "aff" / "AFF-WORD"),
)
template_path = tmp_path / "template.docx"
_template(template_path)
exported = create_word_export(
batch,
_spec(),
template_path,
{"product_name": MergedField("product_name", "产品名称", "甲胎蛋白检测试剂盒", "说明书.txt", "证据", 0.8)},
)
assert exported.export_type == ExportedSummaryFile.ExportType.WORD
assert exported.workflow_type == "application_form_fill"
assert exported.workflow_batch_id == batch.pk
assert ApplicationFormFillArtifact.objects.filter(
batch=batch,
artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE,
).exists()

View File

@@ -0,0 +1,272 @@
import pytest
from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS
from review_agent.application_form_fill.workflow import (
create_application_form_fill_batch,
find_latest_successful_summary_batch,
start_application_form_fill_workflow,
)
from review_agent.models import (
ApplicationFormFillBatch,
Conversation,
FileAttachment,
FileSummaryBatch,
Message,
WorkflowEvent,
WorkflowNodeRun,
)
from review_agent.services import stream_message
from review_agent.skill_router import SkillRoute
pytestmark = pytest.mark.django_db
@pytest.fixture(autouse=True)
def stub_aff_llm_extract(monkeypatch):
monkeypatch.setattr(
"review_agent.application_form_fill.services.field_extract.generate_completion",
lambda messages, temperature=0.0: '{"fields": [], "checklist_items": []}',
)
def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
success = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-FAILED",
status=FileSummaryBatch.Status.FAILED,
)
assert find_latest_successful_summary_batch(conversation) == success
def test_create_application_form_fill_batch_initializes_nodes(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
batch = create_application_form_fill_batch(
conversation=conversation,
user=user,
trigger_message=message,
source_summary_batch=summary,
)
assert batch.status == ApplicationFormFillBatch.Status.PENDING
assert batch.output_types == ["word", "excel", "json"]
assert WorkflowNodeRun.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
).count() == len(FORM_FILL_NODE_DEFINITIONS)
assert WorkflowEvent.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
event_type="workflow_created",
).exists()
def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
batch = create_application_form_fill_batch(
conversation=conversation,
user=user,
trigger_message=trigger,
source_summary_batch=summary,
)
start_application_form_fill_workflow(batch, async_run=False)
batch.refresh_from_db()
assert batch.status == ApplicationFormFillBatch.Status.SUCCESS
assert WorkflowNodeRun.objects.get(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
node_code="pdf_convert",
).status == WorkflowNodeRun.Status.SKIPPED
assert WorkflowEvent.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
event_type="workflow_completed",
).exists()
def test_application_form_fill_workflow_generates_summary_and_exports(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
settings.APPLICATION_FORM_FILL_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-FULL",
status=FileSummaryBatch.Status.SUCCESS,
)
source = tmp_path / "ifu.txt"
source.write_text("产品名称:甲胎蛋白检测试剂盒\n预期用途:用于体外检测", encoding="utf-8")
from review_agent.models import FileSummaryItem
FileSummaryItem.objects.create(
batch=summary,
file_index=1,
file_name="说明书.txt",
file_type="txt",
relative_path="说明书.txt",
storage_path=str(source),
)
batch = create_application_form_fill_batch(
conversation=conversation,
user=user,
trigger_message=trigger,
source_summary_batch=summary,
)
start_application_form_fill_workflow(batch, async_run=False)
batch.refresh_from_db()
assert batch.status == ApplicationFormFillBatch.Status.SUCCESS
assert batch.product_name == "甲胎蛋白检测试剂盒"
assert batch.selected_templates == ["registration_certificate"]
assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已生成申报模板自动填表文件").exists()
assert batch.notifications.filter(send_status="success").exists()
def test_application_form_fill_status_becomes_partial_when_notification_fails(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
settings.APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL = True
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-PARTIAL",
status=FileSummaryBatch.Status.SUCCESS,
)
batch = create_application_form_fill_batch(
conversation=conversation,
user=user,
trigger_message=trigger,
source_summary_batch=summary,
)
start_application_form_fill_workflow(batch, async_run=False)
batch.refresh_from_db()
assert batch.status == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS
assert batch.notifications.filter(send_status="failed").exists()
def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.services.route_message_intent",
lambda conversation, content: SkillRoute(
action="application_form_fill",
workflow_type="application_form_fill",
confidence=0.9,
),
)
frames = list(stream_message(conversation, "帮我填注册证"))
joined = "".join(frames)
assert "请先在当前对话右侧上传需要填表的产品资料或压缩包" in joined
assert not ApplicationFormFillBatch.objects.exists()
def test_stream_message_starts_application_form_fill_workflow(monkeypatch, settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
settings.APPLICATION_FORM_FILL_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-AFF-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
monkeypatch.setattr(
"review_agent.services.route_message_intent",
lambda conversation, content: SkillRoute(
action="application_form_fill",
workflow_type="application_form_fill",
confidence=0.9,
),
)
frames = list(stream_message(conversation, "帮我填注册证"))
joined = "".join(frames)
assert "workflow_started" in joined
assert '"workflow_type": "application_form_fill"' in joined
assert "已启动申报文件自动填表工作流" in joined
assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()
def test_stream_message_auto_runs_summary_before_application_form_fill(
monkeypatch, settings, tmp_path, django_user_model
):
settings.MEDIA_ROOT = tmp_path
settings.APPLICATION_FORM_FILL_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
attachment_path = tmp_path / "application.txt"
attachment_path.write_text("产品名称:甲胎蛋白检测试剂盒", encoding="utf-8")
FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="application.txt",
storage_path=str(attachment_path),
file_size=attachment_path.stat().st_size,
is_active=True,
)
monkeypatch.setattr(
"review_agent.services.route_message_intent",
lambda conversation, content: SkillRoute(
action="application_form_fill",
workflow_type="application_form_fill",
confidence=0.9,
),
)
def finish_summary(batch, async_run=True):
batch.status = FileSummaryBatch.Status.SUCCESS
batch.save(update_fields=["status"])
monkeypatch.setattr("review_agent.services.start_file_summary_workflow", finish_summary)
frames = list(stream_message(conversation, "为我该方案生成申报模板"))
joined = "".join(frames)
assert '"workflow_type": "file_summary"' in joined
assert '"workflow_type": "application_form_fill"' in joined
assert "汇总完成后继续自动填表" in joined
assert FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS).exists()
assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()

View File

@@ -4,6 +4,7 @@ import json
import pytest
from review_agent.models import (
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileAttachment,
@@ -109,6 +110,54 @@ def test_export_download_requires_batch_owner(client, tmp_path, django_user_mode
assert allowed["Content-Type"].startswith("text/markdown")
def test_export_download_checks_application_form_fill_batch_owner(client, tmp_path, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")
owner_conversation = Conversation.objects.create(user=owner, title="自动填表")
other_conversation = Conversation.objects.create(user=other, title="其他对话")
owner_summary = FileSummaryBatch.objects.create(
conversation=owner_conversation,
user=owner,
batch_no="FS-AFF-OWNER",
status=FileSummaryBatch.Status.SUCCESS,
)
other_summary = FileSummaryBatch.objects.create(
conversation=other_conversation,
user=other,
batch_no="FS-AFF-OTHER",
status=FileSummaryBatch.Status.SUCCESS,
)
form_batch = ApplicationFormFillBatch.objects.create(
conversation=owner_conversation,
user=owner,
source_summary_batch=owner_summary,
batch_no="AFF-DL",
)
report_path = tmp_path / "filled.docx"
report_path.write_bytes(b"word-content")
exported = ExportedSummaryFile.objects.create(
batch=other_summary,
workflow_type="application_form_fill",
workflow_batch_id=form_batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path=str(report_path),
)
client.force_login(other)
denied = client.get(reverse("file_summary_export_download", args=[exported.pk]))
assert denied.status_code == 404
client.force_login(owner)
allowed = client.get(reverse("file_summary_export_download", args=[exported.pk]))
assert allowed.status_code == 200
assert allowed["Content-Type"].startswith(
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
assert b"".join(allowed.streaming_content) == b"word-content"
def test_conversation_messages_returns_incremental_messages(client, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")