diff --git a/review_agent/regulatory_review/schemas.py b/review_agent/regulatory_review/schemas.py new file mode 100644 index 0000000..394b593 --- /dev/null +++ b/review_agent/regulatory_review/schemas.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field + + +@dataclass(frozen=True) +class Finding: + rule_code: str + category: str + severity: str + title: str + detail: str = "" + suggestion: str = "" + evidence: dict[str, object] = field(default_factory=dict) + citations: list[dict[str, object]] = field(default_factory=list) + + def to_dict(self) -> dict[str, object]: + return asdict(self) diff --git a/review_agent/regulatory_review/services/completeness_check.py b/review_agent/regulatory_review/services/completeness_check.py new file mode 100644 index 0000000..f1a684d --- /dev/null +++ b/review_agent/regulatory_review/services/completeness_check.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from review_agent.models import FileSummaryBatch +from review_agent.regulatory_review.schemas import Finding + + +def run_completeness_check(batch: FileSummaryBatch, rule_set: dict) -> list[Finding]: + items = list(batch.items.order_by("file_index")) + findings: list[Finding] = [] + for requirement in rule_set.get("requirements", []): + if requirement.get("type") not in {"required", "conditional", "recommended"}: + continue + matched = [ + item + for item in items + if _matches_item(item.file_name, item.relative_path, requirement.get("file_keywords", [])) + ] + if matched: + continue + findings.append( + Finding( + rule_code=requirement["code"], + category=requirement.get("category", "completeness"), + severity=requirement.get("severity", "medium"), + title=f"缺少{requirement['title']}", + detail=f"当前文件汇总批次未发现{requirement['title']}。", + suggestion=requirement.get("suggestion", ""), + evidence={ + "requirement_type": requirement.get("type"), + "matched_files": [], + "searched_keywords": requirement.get("file_keywords", []), + }, + ) + ) + return findings + + +def _matches_item(file_name: str, relative_path: str, keywords: list[str]) -> bool: + haystack = f"{file_name} {relative_path}".lower() + return any(str(keyword).lower() in haystack for keyword in keywords) diff --git a/review_agent/regulatory_review/services/consistency_check.py b/review_agent/regulatory_review/services/consistency_check.py new file mode 100644 index 0000000..65782ed --- /dev/null +++ b/review_agent/regulatory_review/services/consistency_check.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import re +from collections import defaultdict + +from review_agent.regulatory_review.schemas import Finding + + +FIELDS = { + "产品名称": r"产品名称[::]\s*([^\n\r]+)", + "型号规格": r"型号规格[::]\s*([^\n\r]+)", + "预期用途": r"预期用途[::]\s*([^\n\r]+)", +} + + +def run_consistency_check(document_texts: dict[str, str]) -> list[Finding]: + findings: list[Finding] = [] + for label, pattern in FIELDS.items(): + values: dict[str, list[str]] = defaultdict(list) + for file_name, text in document_texts.items(): + match = re.search(pattern, text) + if match: + values[_normalize(match.group(1))].append(file_name) + if len(values) <= 1: + continue + findings.append( + Finding( + rule_code=f"consistency:{label}", + category="consistency", + severity="high", + title=f"{label}在不同文件中不一致", + detail=f"发现 {len(values)} 个不同的{label}取值。", + suggestion=f"请统一各注册资料中的{label}。", + evidence={"field": label, "values": dict(values)}, + ) + ) + return findings + + +def _normalize(value: str) -> str: + return " ".join(value.strip().split()) diff --git a/review_agent/regulatory_review/services/structure_check.py b/review_agent/regulatory_review/services/structure_check.py new file mode 100644 index 0000000..d12eac0 --- /dev/null +++ b/review_agent/regulatory_review/services/structure_check.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from review_agent.regulatory_review.schemas import Finding + + +def run_structure_check(document_texts: dict[str, str], rule_set: dict) -> list[Finding]: + findings: list[Finding] = [] + for requirement in rule_set.get("requirements", []): + required_sections = requirement.get("required_sections") or [] + if not required_sections: + continue + matching_docs = _matching_documents(document_texts, requirement.get("file_keywords", [])) + if not matching_docs: + continue + combined_text = "\n".join(matching_docs.values()) + for section in required_sections: + if section in combined_text: + continue + findings.append( + Finding( + rule_code=f"{requirement['code']}:{section}", + category="structure", + severity=requirement.get("severity", "medium"), + title=f"{requirement['title']}缺少{section}章节", + detail=f"已匹配{requirement['title']}文件,但未发现{section}相关内容。", + suggestion=requirement.get("suggestion", ""), + evidence={"section": section, "files": list(matching_docs)}, + ) + ) + return findings + + +def _matching_documents(document_texts: dict[str, str], keywords: list[str]) -> dict[str, str]: + if not keywords: + return document_texts + result = {} + for name, text in document_texts.items(): + haystack = f"{name}\n{text}".lower() + if any(str(keyword).lower() in haystack for keyword in keywords): + result[name] = text + return result diff --git a/review_agent/regulatory_review/services/text_extract.py b/review_agent/regulatory_review/services/text_extract.py new file mode 100644 index 0000000..7d2d1cf --- /dev/null +++ b/review_agent/regulatory_review/services/text_extract.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass +from pathlib import Path + +from .rag_index import extract_text_from_path + + +@dataclass(frozen=True) +class ExtractedText: + path: Path + text: str + status: str + content_hash: str = "" + error_message: str = "" + + +SUPPORTED_EXTENSIONS = {".txt", ".md", ".pdf", ".docx", ".pptx", ".xlsx", ".doc"} + + +def extract_text(path: str | Path) -> ExtractedText: + file_path = Path(path) + if file_path.suffix.lower() not in SUPPORTED_EXTENSIONS: + return ExtractedText(path=file_path, text="", status="unsupported") + try: + text = extract_text_from_path(file_path) + except Exception as exc: + return ExtractedText(path=file_path, text="", status="failed", error_message=str(exc)) + content_hash = hashlib.sha256(text.encode("utf-8")).hexdigest() if text else "" + return ExtractedText(path=file_path, text=text, status="success", content_hash=content_hash) diff --git a/review_agent/regulatory_review/storage.py b/review_agent/regulatory_review/storage.py new file mode 100644 index 0000000..9d53006 --- /dev/null +++ b/review_agent/regulatory_review/storage.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from django.conf import settings + +from review_agent.models import RegulatoryArtifact, RegulatoryReviewBatch + + +def save_artifact( + batch: RegulatoryReviewBatch, + *, + name: str, + content: str | bytes, + artifact_type: str, + metadata: dict | None = None, +) -> RegulatoryArtifact: + root = Path(batch.work_dir) if batch.work_dir else Path(settings.MEDIA_ROOT) / "regulatory_review" / "work" / batch.batch_no + root.mkdir(parents=True, exist_ok=True) + path = root / Path(name).name + if isinstance(content, bytes): + path.write_bytes(content) + digest = hashlib.sha256(content).hexdigest() + else: + path.write_text(content, encoding="utf-8") + digest = hashlib.sha256(content.encode("utf-8")).hexdigest() + return RegulatoryArtifact.objects.create( + batch=batch, + artifact_type=artifact_type, + name=path.name, + storage_path=str(path), + content_hash=digest, + metadata=metadata or {}, + ) diff --git a/tests/test_regulatory_completeness.py b/tests/test_regulatory_completeness.py new file mode 100644 index 0000000..3a0ce5c --- /dev/null +++ b/tests/test_regulatory_completeness.py @@ -0,0 +1,44 @@ +import pytest + +from review_agent.models import Conversation, FileSummaryBatch, FileSummaryItem +from review_agent.regulatory_review.services.completeness_check import run_completeness_check +from review_agent.regulatory_review.services.rule_loader import load_rule_file + + +pytestmark = pytest.mark.django_db + + +def test_completeness_check_matches_existing_files_and_reports_missing(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + batch = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-CHECK", + status=FileSummaryBatch.Status.SUCCESS, + ) + FileSummaryItem.objects.create( + batch=batch, + file_index=1, + file_name="产品技术要求.docx", + file_type="docx", + relative_path="产品技术要求.docx", + storage_path="x/product.docx", + ) + FileSummaryItem.objects.create( + batch=batch, + file_index=2, + file_name="说明书.docx", + file_type="docx", + relative_path="说明书.docx", + storage_path="x/ifu.docx", + ) + + findings = run_completeness_check(batch, load_rule_file()) + + titles = [finding.title for finding in findings] + assert "缺少注册检验报告" in titles + assert "缺少产品技术要求" not in titles + missing = next(finding for finding in findings if finding.rule_code == "registration_test_report") + assert missing.severity == "blocking" + assert missing.category == "completeness" diff --git a/tests/test_regulatory_consistency.py b/tests/test_regulatory_consistency.py new file mode 100644 index 0000000..f2b2e97 --- /dev/null +++ b/tests/test_regulatory_consistency.py @@ -0,0 +1,14 @@ +from review_agent.regulatory_review.services.consistency_check import run_consistency_check + + +def test_consistency_check_reports_product_name_mismatch(): + document_texts = { + "说明书.docx": "产品名称:甲胎蛋白检测试剂盒\n型号规格:20人份/盒\n预期用途:定量检测AFP", + "技术要求.docx": "产品名称:乙肝表面抗原检测试剂盒\n型号规格:20人份/盒\n预期用途:定量检测AFP", + } + + findings = run_consistency_check(document_texts) + + assert len(findings) == 1 + assert findings[0].category == "consistency" + assert "产品名称" in findings[0].title diff --git a/tests/test_regulatory_storage.py b/tests/test_regulatory_storage.py new file mode 100644 index 0000000..4fdb7bb --- /dev/null +++ b/tests/test_regulatory_storage.py @@ -0,0 +1,26 @@ +import pytest + +from review_agent.models import Conversation, FileSummaryBatch, RegulatoryReviewBatch +from review_agent.regulatory_review.storage import save_artifact + + +pytestmark = pytest.mark.django_db + + +def test_save_artifact_writes_file_and_records_hash(settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-OK") + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-ART", + ) + + artifact = save_artifact(batch, name="raw.json", content='{"ok": true}', artifact_type="json") + + assert artifact.content_hash + assert artifact.storage_path.endswith("raw.json") + assert (tmp_path / "regulatory_review" / "work" / "RR-ART" / "raw.json").exists() diff --git a/tests/test_regulatory_structure.py b/tests/test_regulatory_structure.py new file mode 100644 index 0000000..b905b6a --- /dev/null +++ b/tests/test_regulatory_structure.py @@ -0,0 +1,13 @@ +from review_agent.regulatory_review.services.rule_loader import load_rule_file +from review_agent.regulatory_review.services.structure_check import run_structure_check + + +def test_structure_check_reports_missing_instruction_sections(): + document_texts = { + "说明书.docx": "产品名称:甲胎蛋白检测试剂盒\n样本要求:血清样本\n有效期:12个月" + } + + findings = run_structure_check(document_texts, load_rule_file()) + + assert any(finding.rule_code == "instructions_for_use:储存条件" for finding in findings) + assert all("样本要求" not in finding.title for finding in findings) diff --git a/tests/test_regulatory_text_extract.py b/tests/test_regulatory_text_extract.py new file mode 100644 index 0000000..713313f --- /dev/null +++ b/tests/test_regulatory_text_extract.py @@ -0,0 +1,24 @@ +from pathlib import Path + +from review_agent.regulatory_review.services.text_extract import extract_text + + +def test_extract_text_reads_plain_text(tmp_path): + path = tmp_path / "说明书.txt" + path.write_text("产品名称:甲胎蛋白检测试剂盒\n储存条件:2-8℃", encoding="utf-8") + + result = extract_text(path) + + assert "甲胎蛋白" in result.text + assert result.status == "success" + assert result.content_hash + + +def test_extract_text_reports_unsupported_file(tmp_path): + path = tmp_path / "image.png" + path.write_bytes(b"png") + + result = extract_text(path) + + assert result.status == "unsupported" + assert result.text == ""