From 2a4dd6cfabb89bd0df407096ddc0d6e5665542ae Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 00:26:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(regulatory):=20=E5=A2=9E=E5=8A=A0=E6=B3=95?= =?UTF-8?q?=E8=A7=84=E8=A7=84=E5=88=99=E7=89=88=E6=9C=AC=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 1 + review_agent/management/__init__.py | 1 + review_agent/management/commands/__init__.py | 1 + .../commands/regulatory_rules_check.py | 27 +++++ review_agent/regulatory_review/__init__.py | 1 + .../rules/nmpa_ivd_registration_v1.yaml | 58 ++++++++++ .../regulatory_review/services/__init__.py | 1 + .../regulatory_review/services/rule_loader.py | 106 ++++++++++++++++++ tests/test_regulatory_rule_loader.py | 68 +++++++++++ 9 files changed, 264 insertions(+) create mode 100644 review_agent/management/__init__.py create mode 100644 review_agent/management/commands/__init__.py create mode 100644 review_agent/management/commands/regulatory_rules_check.py create mode 100644 review_agent/regulatory_review/__init__.py create mode 100644 review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml create mode 100644 review_agent/regulatory_review/services/__init__.py create mode 100644 review_agent/regulatory_review/services/rule_loader.py create mode 100644 tests/test_regulatory_rule_loader.py diff --git a/requirements.txt b/requirements.txt index f257506..a04423d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ xlrd>=2.0 olefile>=0.47 py7zr>=0.21 playwright>=1.60 +PyYAML>=6.0 diff --git a/review_agent/management/__init__.py b/review_agent/management/__init__.py new file mode 100644 index 0000000..bd9bed7 --- /dev/null +++ b/review_agent/management/__init__.py @@ -0,0 +1 @@ +"""Management command package for review_agent.""" diff --git a/review_agent/management/commands/__init__.py b/review_agent/management/commands/__init__.py new file mode 100644 index 0000000..823f3f6 --- /dev/null +++ b/review_agent/management/commands/__init__.py @@ -0,0 +1 @@ +"""Management commands for review_agent.""" diff --git a/review_agent/management/commands/regulatory_rules_check.py b/review_agent/management/commands/regulatory_rules_check.py new file mode 100644 index 0000000..17e83af --- /dev/null +++ b/review_agent/management/commands/regulatory_rules_check.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from django.core.management.base import BaseCommand + +from review_agent.regulatory_review.services.rule_loader import check_rule_version + + +class Command(BaseCommand): + help = "检查 NMPA 法规核查 YAML 规则与数据库版本记录。" + + def add_arguments(self, parser): + parser.add_argument( + "--no-create", + action="store_true", + help="缺少数据库记录时只报告 missing,不创建记录。", + ) + + def handle(self, *args, **options): + result = check_rule_version(update_missing=not options["no_create"]) + self.stdout.write( + f"{result.code}: {result.status}; yaml_hash={result.current_hash}; " + f"db_hash={result.database_hash or '-'}; path={result.path}" + ) + if result.status == "mismatch": + self.stdout.write( + self.style.WARNING("YAML 与数据库记录不一致,请人工确认后更新规则版本记录。") + ) diff --git a/review_agent/regulatory_review/__init__.py b/review_agent/regulatory_review/__init__.py new file mode 100644 index 0000000..a47f031 --- /dev/null +++ b/review_agent/regulatory_review/__init__.py @@ -0,0 +1 @@ +"""NMPA regulatory review workflow package.""" diff --git a/review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml b/review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml new file mode 100644 index 0000000..19cc16b --- /dev/null +++ b/review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml @@ -0,0 +1,58 @@ +code: nmpa_ivd_registration_v1 +name: NMPA IVD 注册资料 Demo 规则 +rag_collection: nmpa_ivd_registration_v1 +source_material_dir: docs/0.原始材料/关于公布体外诊断试剂注册申报资料要求和批准证明文件格式的公告 +requirements: + - code: product_technical_requirements + title: 产品技术要求 + type: required + severity: blocking + category: completeness + file_keywords: + - 产品技术要求 + suggestion: 请补充产品技术要求并确认版本与注册申请资料一致。 + citation_query: 体外诊断试剂 产品技术要求 注册申报资料 + - code: instructions_for_use + title: 说明书 + type: required + severity: high + category: completeness + file_keywords: + - 说明书 + - 使用说明 + required_sections: + - 储存条件 + - 有效期 + - 样本要求 + suggestion: 请补充说明书并核对储存条件、有效期和样本要求章节。 + citation_query: 体外诊断试剂 说明书 储存条件 有效期 样本要求 + - code: registration_test_report + title: 注册检验报告 + type: required + severity: blocking + category: completeness + file_keywords: + - 注册检验报告 + - 检验报告 + suggestion: 请补充注册检验报告并复核报告覆盖的产品型号。 + citation_query: 体外诊断试剂 注册检验报告 注册申报资料 + - code: clinical_evaluation + title: 临床评价资料 + type: conditional + severity: high + category: completeness + file_keywords: + - 临床评价 + - 临床试验 + suggestion: 请根据适用情形补充临床评价资料或说明豁免依据。 + citation_query: 体外诊断试剂 临床评价资料 注册申报 + - code: essential_principles_checklist + title: 安全和性能基本原则清单 + type: recommended + severity: medium + category: completeness + file_keywords: + - 安全和性能基本原则 + - 基本原则清单 + suggestion: 建议补充安全和性能基本原则清单,便于审评追溯。 + citation_query: 体外诊断试剂 安全和性能基本原则清单 diff --git a/review_agent/regulatory_review/services/__init__.py b/review_agent/regulatory_review/services/__init__.py new file mode 100644 index 0000000..8c2d48e --- /dev/null +++ b/review_agent/regulatory_review/services/__init__.py @@ -0,0 +1 @@ +"""Services for NMPA regulatory review.""" diff --git a/review_agent/regulatory_review/services/rule_loader.py b/review_agent/regulatory_review/services/rule_loader.py new file mode 100644 index 0000000..bbd671f --- /dev/null +++ b/review_agent/regulatory_review/services/rule_loader.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass +from pathlib import Path + +import yaml +from django.conf import settings + +from review_agent.models import RegulatoryRuleVersion + + +DEFAULT_RULE_CODE = "nmpa_ivd_registration_v1" +DEFAULT_RULE_PATH = ( + Path(settings.BASE_DIR) + / "review_agent" + / "regulatory_review" + / "rules" + / "nmpa_ivd_registration_v1.yaml" +) + + +@dataclass(frozen=True) +class RuleVersionCheck: + status: str + code: str + path: Path + current_hash: str + database_hash: str = "" + record: RegulatoryRuleVersion | None = None + + +def compute_file_sha256(path: str | Path) -> str: + file_path = Path(path) + digest = hashlib.sha256() + with file_path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def load_rule_file(path: str | Path | None = None) -> dict: + rule_path = Path(path) if path else DEFAULT_RULE_PATH + with rule_path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) or {} + if payload.get("code") != DEFAULT_RULE_CODE: + raise ValueError(f"规则 code 必须为 {DEFAULT_RULE_CODE}") + if not isinstance(payload.get("requirements"), list) or not payload["requirements"]: + raise ValueError("规则文件必须包含 requirements 列表。") + return payload + + +def check_rule_version( + *, + path: str | Path | None = None, + update_missing: bool = True, +) -> RuleVersionCheck: + rule_path = Path(path) if path else DEFAULT_RULE_PATH + rule_set = load_rule_file(rule_path) + current_hash = compute_file_sha256(rule_path) + record = RegulatoryRuleVersion.objects.filter(code=rule_set["code"]).first() + yaml_path = str(rule_path.relative_to(settings.BASE_DIR)) + + if record is None: + if not update_missing: + return RuleVersionCheck( + status="missing", + code=rule_set["code"], + path=rule_path, + current_hash=current_hash, + ) + record = RegulatoryRuleVersion.objects.create( + code=rule_set["code"], + name=rule_set.get("name") or rule_set["code"], + yaml_path=yaml_path, + yaml_hash=current_hash, + rag_collection=rule_set.get("rag_collection", ""), + status=RegulatoryRuleVersion.Status.ACTIVE, + ) + return RuleVersionCheck( + status="created", + code=record.code, + path=rule_path, + current_hash=current_hash, + database_hash=record.yaml_hash, + record=record, + ) + + if record.yaml_hash != current_hash: + return RuleVersionCheck( + status="mismatch", + code=record.code, + path=rule_path, + current_hash=current_hash, + database_hash=record.yaml_hash, + record=record, + ) + + return RuleVersionCheck( + status="ok", + code=record.code, + path=rule_path, + current_hash=current_hash, + database_hash=record.yaml_hash, + record=record, + ) diff --git a/tests/test_regulatory_rule_loader.py b/tests/test_regulatory_rule_loader.py new file mode 100644 index 0000000..e74dc88 --- /dev/null +++ b/tests/test_regulatory_rule_loader.py @@ -0,0 +1,68 @@ +from pathlib import Path + +import pytest +from django.core.management import call_command + +from review_agent.models import RegulatoryRuleVersion +from review_agent.regulatory_review.services.rule_loader import ( + DEFAULT_RULE_CODE, + check_rule_version, + compute_file_sha256, + load_rule_file, +) + + +pytestmark = pytest.mark.django_db + + +def test_load_rule_file_reads_demo_requirements(): + rule_set = load_rule_file() + + codes = {item["code"] for item in rule_set["requirements"]} + assert rule_set["code"] == DEFAULT_RULE_CODE + assert "product_technical_requirements" in codes + assert "instructions_for_use" in codes + assert "registration_test_report" in codes + assert "clinical_evaluation" in codes + assert "essential_principles_checklist" in codes + + +def test_compute_file_sha256_changes_when_file_changes(tmp_path): + path = tmp_path / "rule.yaml" + path.write_text("code: demo\n", encoding="utf-8") + first = compute_file_sha256(path) + path.write_text("code: demo2\n", encoding="utf-8") + + assert compute_file_sha256(path) != first + + +def test_check_rule_version_creates_missing_db_record(): + result = check_rule_version(update_missing=True) + + record = RegulatoryRuleVersion.objects.get(code=DEFAULT_RULE_CODE) + assert result.status == "created" + assert result.current_hash == record.yaml_hash + assert record.rag_collection == "nmpa_ivd_registration_v1" + + +def test_check_rule_version_reports_hash_mismatch_without_overwriting(): + created = check_rule_version(update_missing=True) + record = RegulatoryRuleVersion.objects.get(code=DEFAULT_RULE_CODE) + record.yaml_hash = "stale" + record.save(update_fields=["yaml_hash"]) + + result = check_rule_version(update_missing=False) + record.refresh_from_db() + + assert result.status == "mismatch" + assert result.database_hash == "stale" + assert result.current_hash == created.current_hash + assert record.yaml_hash == "stale" + + +def test_regulatory_rules_check_command_reports_status(capsys): + call_command("regulatory_rules_check") + + captured = capsys.readouterr() + assert DEFAULT_RULE_CODE in captured.out + assert "created" in captured.out or "ok" in captured.out