feat(regulatory): 支持按附件4章节核查

This commit is contained in:
2026-06-07 11:17:57 +08:00
parent f46d9c5be6
commit b8d711729d
2 changed files with 121 additions and 2 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import logging import logging
import re
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
from uuid import uuid4 from uuid import uuid4
@@ -48,6 +49,16 @@ NODE_DEFINITIONS = [
logger = logging.getLogger("review_agent.regulatory_review.workflow") logger = logging.getLogger("review_agent.regulatory_review.workflow")
ATTACHMENT4_CHAPTER_LABELS = {
"1": "第1章 监管信息",
"2": "第2章 综述资料",
"3": "第3章 非临床资料",
"4": "第4章 临床评价资料",
"5": "第5章 产品说明书和标签样稿",
"6": "第6章 质量管理体系文件",
}
class WorkflowPausedForUser(Exception): class WorkflowPausedForUser(Exception):
pass pass
@@ -89,6 +100,7 @@ def create_regulatory_review_batch(
source_summary_batch=source_summary_batch, source_summary_batch=source_summary_batch,
batch_no=batch_no, batch_no=batch_no,
work_dir=str(work_dir), work_dir=str(work_dir),
condition_json=_initial_condition_json(trigger_message),
) )
for code, name, group in NODE_DEFINITIONS: for code, name, group in NODE_DEFINITIONS:
WorkflowNodeRun.objects.create( WorkflowNodeRun.objects.create(
@@ -173,7 +185,7 @@ class RegulatoryWorkflowExecutor:
self._pause_for_condition_confirmation() self._pause_for_condition_confirmation()
return return
if node_code == "rule_scope": if node_code == "rule_scope":
self.rule_set = load_rule_file() self.rule_set = apply_rule_scope(load_rule_file(), self.batch.condition_json.get("rule_scope") or {})
return return
if node_code == "completeness_check": if node_code == "completeness_check":
self.findings.extend(run_completeness_check(self.batch.source_summary_batch, self._rules())) self.findings.extend(run_completeness_check(self.batch.source_summary_batch, self._rules()))
@@ -258,7 +270,7 @@ class RegulatoryWorkflowExecutor:
def _rules(self) -> dict: def _rules(self) -> dict:
if self.rule_set is None: if self.rule_set is None:
self.rule_set = load_rule_file() self.rule_set = apply_rule_scope(load_rule_file(), self.batch.condition_json.get("rule_scope") or {})
return self.rule_set return self.rule_set
def _extract_source_texts(self) -> dict[str, str]: def _extract_source_texts(self) -> dict[str, str]:
@@ -298,3 +310,52 @@ def start_regulatory_review_workflow(batch: RegulatoryReviewBatch, *, async_run:
executor.run() executor.run()
return return
Thread(target=executor.run, daemon=True).start() Thread(target=executor.run, daemon=True).start()
def _initial_condition_json(trigger_message: Message | None) -> dict:
scope = detect_attachment4_chapter_scope(trigger_message.content if trigger_message else "")
return {"rule_scope": scope} if scope else {}
def detect_attachment4_chapter_scope(content: str) -> dict[str, str] | None:
normalized = (content or "").strip()
if not normalized:
return None
chapter = _extract_chapter_number(normalized)
if chapter not in ATTACHMENT4_CHAPTER_LABELS:
return None
return {"attachment4_chapter": chapter, "label": ATTACHMENT4_CHAPTER_LABELS[chapter]}
def apply_rule_scope(rule_set: dict, rule_scope: dict) -> dict:
chapter = str(rule_scope.get("attachment4_chapter") or "")
if chapter not in ATTACHMENT4_CHAPTER_LABELS:
return rule_set
scoped = {**rule_set}
scoped["requirements"] = [
requirement
for requirement in rule_set.get("requirements", [])
if _requirement_in_chapter(requirement, chapter)
]
scoped["active_rule_scope"] = rule_scope
return scoped
def _requirement_in_chapter(requirement: dict, chapter: str) -> bool:
attachment4_code = str(requirement.get("attachment4_code") or "")
return attachment4_code == chapter or attachment4_code.startswith(f"{chapter}.")
def _extract_chapter_number(content: str) -> str:
match = re.search(r"\s*([一二三四五六1-6])\s*[章节张]", content)
if match:
return _normalize_chapter_number(match.group(1))
match = re.search(r"(^|[^\d])([1-6])\s*[章节张]", content)
if match:
return match.group(2)
return ""
def _normalize_chapter_number(value: str) -> str:
chinese = {"": "1", "": "2", "": "3", "": "4", "": "5", "": "6"}
return chinese.get(value, value)

View File

@@ -163,6 +163,64 @@ def test_stream_message_starts_regulatory_workflow(monkeypatch, settings, django
assert RegulatoryReviewBatch.objects.filter(conversation=conversation).exists() assert RegulatoryReviewBatch.objects.filter(conversation=conversation).exists()
def test_stream_message_records_attachment4_chapter_scope(monkeypatch, settings, django_user_model):
settings.REGULATORY_REVIEW_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
monkeypatch.setattr(
"review_agent.services.route_message_intent",
lambda conversation, content: SkillRoute(
action="regulatory_review",
workflow_type="regulatory_review",
confidence=0.9,
),
)
list(stream_message(conversation, "请做第一章 NMPA 法规核查"))
batch = RegulatoryReviewBatch.objects.get(conversation=conversation)
assert batch.condition_json["rule_scope"]["attachment4_chapter"] == "1"
assert batch.condition_json["rule_scope"]["label"] == "第1章 监管信息"
def test_workflow_chapter_scope_only_checks_selected_attachment4_chapter(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
settings.REGULATORY_REVIEW_ASYNC = False
settings.REGULATORY_RAG_CHROMA_PATH = tmp_path / "missing-rag"
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-OK",
status=FileSummaryBatch.Status.SUCCESS,
)
batch = create_regulatory_review_batch(
conversation=conversation,
user=user,
source_summary_batch=summary,
)
batch.condition_json = {
"confirmed": True,
"confirmed_conditions": {"product_category": "体外诊断试剂"},
"rule_scope": {"attachment4_chapter": "1", "label": "第1章 监管信息"},
}
batch.save(update_fields=["condition_json"])
start_regulatory_review_workflow(batch, async_run=False)
issue_codes = list(RegulatoryIssue.objects.filter(batch=batch).values_list("rule_code", flat=True))
assert issue_codes
assert all(code.startswith("attachment4_1") for code in issue_codes)
assert not any(code.startswith("attachment4_2") for code in issue_codes)
def test_workflow_generates_issues_exports_and_assistant_summary(settings, tmp_path, django_user_model): def test_workflow_generates_issues_exports_and_assistant_summary(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path settings.MEDIA_ROOT = tmp_path
settings.REGULATORY_REVIEW_ASYNC = False settings.REGULATORY_REVIEW_ASYNC = False