feat(regulatory): 增加法规规则版本检查

This commit is contained in:
2026-06-07 00:26:19 +08:00
parent f52dcc197d
commit 2a4dd6cfab
9 changed files with 264 additions and 0 deletions

View File

@@ -7,3 +7,4 @@ xlrd>=2.0
olefile>=0.47 olefile>=0.47
py7zr>=0.21 py7zr>=0.21
playwright>=1.60 playwright>=1.60
PyYAML>=6.0

View File

@@ -0,0 +1 @@
"""Management command package for review_agent."""

View File

@@ -0,0 +1 @@
"""Management commands for review_agent."""

View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from django.core.management.base import BaseCommand
from review_agent.regulatory_review.services.rule_loader import check_rule_version
class Command(BaseCommand):
help = "检查 NMPA 法规核查 YAML 规则与数据库版本记录。"
def add_arguments(self, parser):
parser.add_argument(
"--no-create",
action="store_true",
help="缺少数据库记录时只报告 missing不创建记录。",
)
def handle(self, *args, **options):
result = check_rule_version(update_missing=not options["no_create"])
self.stdout.write(
f"{result.code}: {result.status}; yaml_hash={result.current_hash}; "
f"db_hash={result.database_hash or '-'}; path={result.path}"
)
if result.status == "mismatch":
self.stdout.write(
self.style.WARNING("YAML 与数据库记录不一致,请人工确认后更新规则版本记录。")
)

View File

@@ -0,0 +1 @@
"""NMPA regulatory review workflow package."""

View File

@@ -0,0 +1,58 @@
code: nmpa_ivd_registration_v1
name: NMPA IVD 注册资料 Demo 规则
rag_collection: nmpa_ivd_registration_v1
source_material_dir: docs/0.原始材料/关于公布体外诊断试剂注册申报资料要求和批准证明文件格式的公告
requirements:
- code: product_technical_requirements
title: 产品技术要求
type: required
severity: blocking
category: completeness
file_keywords:
- 产品技术要求
suggestion: 请补充产品技术要求并确认版本与注册申请资料一致。
citation_query: 体外诊断试剂 产品技术要求 注册申报资料
- code: instructions_for_use
title: 说明书
type: required
severity: high
category: completeness
file_keywords:
- 说明书
- 使用说明
required_sections:
- 储存条件
- 有效期
- 样本要求
suggestion: 请补充说明书并核对储存条件、有效期和样本要求章节。
citation_query: 体外诊断试剂 说明书 储存条件 有效期 样本要求
- code: registration_test_report
title: 注册检验报告
type: required
severity: blocking
category: completeness
file_keywords:
- 注册检验报告
- 检验报告
suggestion: 请补充注册检验报告并复核报告覆盖的产品型号。
citation_query: 体外诊断试剂 注册检验报告 注册申报资料
- code: clinical_evaluation
title: 临床评价资料
type: conditional
severity: high
category: completeness
file_keywords:
- 临床评价
- 临床试验
suggestion: 请根据适用情形补充临床评价资料或说明豁免依据。
citation_query: 体外诊断试剂 临床评价资料 注册申报
- code: essential_principles_checklist
title: 安全和性能基本原则清单
type: recommended
severity: medium
category: completeness
file_keywords:
- 安全和性能基本原则
- 基本原则清单
suggestion: 建议补充安全和性能基本原则清单,便于审评追溯。
citation_query: 体外诊断试剂 安全和性能基本原则清单

View File

@@ -0,0 +1 @@
"""Services for NMPA regulatory review."""

View File

@@ -0,0 +1,106 @@
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from pathlib import Path
import yaml
from django.conf import settings
from review_agent.models import RegulatoryRuleVersion
DEFAULT_RULE_CODE = "nmpa_ivd_registration_v1"
DEFAULT_RULE_PATH = (
Path(settings.BASE_DIR)
/ "review_agent"
/ "regulatory_review"
/ "rules"
/ "nmpa_ivd_registration_v1.yaml"
)
@dataclass(frozen=True)
class RuleVersionCheck:
status: str
code: str
path: Path
current_hash: str
database_hash: str = ""
record: RegulatoryRuleVersion | None = None
def compute_file_sha256(path: str | Path) -> str:
file_path = Path(path)
digest = hashlib.sha256()
with file_path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def load_rule_file(path: str | Path | None = None) -> dict:
rule_path = Path(path) if path else DEFAULT_RULE_PATH
with rule_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if payload.get("code") != DEFAULT_RULE_CODE:
raise ValueError(f"规则 code 必须为 {DEFAULT_RULE_CODE}")
if not isinstance(payload.get("requirements"), list) or not payload["requirements"]:
raise ValueError("规则文件必须包含 requirements 列表。")
return payload
def check_rule_version(
*,
path: str | Path | None = None,
update_missing: bool = True,
) -> RuleVersionCheck:
rule_path = Path(path) if path else DEFAULT_RULE_PATH
rule_set = load_rule_file(rule_path)
current_hash = compute_file_sha256(rule_path)
record = RegulatoryRuleVersion.objects.filter(code=rule_set["code"]).first()
yaml_path = str(rule_path.relative_to(settings.BASE_DIR))
if record is None:
if not update_missing:
return RuleVersionCheck(
status="missing",
code=rule_set["code"],
path=rule_path,
current_hash=current_hash,
)
record = RegulatoryRuleVersion.objects.create(
code=rule_set["code"],
name=rule_set.get("name") or rule_set["code"],
yaml_path=yaml_path,
yaml_hash=current_hash,
rag_collection=rule_set.get("rag_collection", ""),
status=RegulatoryRuleVersion.Status.ACTIVE,
)
return RuleVersionCheck(
status="created",
code=record.code,
path=rule_path,
current_hash=current_hash,
database_hash=record.yaml_hash,
record=record,
)
if record.yaml_hash != current_hash:
return RuleVersionCheck(
status="mismatch",
code=record.code,
path=rule_path,
current_hash=current_hash,
database_hash=record.yaml_hash,
record=record,
)
return RuleVersionCheck(
status="ok",
code=record.code,
path=rule_path,
current_hash=current_hash,
database_hash=record.yaml_hash,
record=record,
)

View File

@@ -0,0 +1,68 @@
from pathlib import Path
import pytest
from django.core.management import call_command
from review_agent.models import RegulatoryRuleVersion
from review_agent.regulatory_review.services.rule_loader import (
DEFAULT_RULE_CODE,
check_rule_version,
compute_file_sha256,
load_rule_file,
)
pytestmark = pytest.mark.django_db
def test_load_rule_file_reads_demo_requirements():
rule_set = load_rule_file()
codes = {item["code"] for item in rule_set["requirements"]}
assert rule_set["code"] == DEFAULT_RULE_CODE
assert "product_technical_requirements" in codes
assert "instructions_for_use" in codes
assert "registration_test_report" in codes
assert "clinical_evaluation" in codes
assert "essential_principles_checklist" in codes
def test_compute_file_sha256_changes_when_file_changes(tmp_path):
path = tmp_path / "rule.yaml"
path.write_text("code: demo\n", encoding="utf-8")
first = compute_file_sha256(path)
path.write_text("code: demo2\n", encoding="utf-8")
assert compute_file_sha256(path) != first
def test_check_rule_version_creates_missing_db_record():
result = check_rule_version(update_missing=True)
record = RegulatoryRuleVersion.objects.get(code=DEFAULT_RULE_CODE)
assert result.status == "created"
assert result.current_hash == record.yaml_hash
assert record.rag_collection == "nmpa_ivd_registration_v1"
def test_check_rule_version_reports_hash_mismatch_without_overwriting():
created = check_rule_version(update_missing=True)
record = RegulatoryRuleVersion.objects.get(code=DEFAULT_RULE_CODE)
record.yaml_hash = "stale"
record.save(update_fields=["yaml_hash"])
result = check_rule_version(update_missing=False)
record.refresh_from_db()
assert result.status == "mismatch"
assert result.database_hash == "stale"
assert result.current_hash == created.current_hash
assert record.yaml_hash == "stale"
def test_regulatory_rules_check_command_reports_status(capsys):
call_command("regulatory_rules_check")
captured = capsys.readouterr()
assert DEFAULT_RULE_CODE in captured.out
assert "created" in captured.out or "ok" in captured.out