feat(regulatory): 增加法规核查基础服务

This commit is contained in:
2026-06-07 00:36:18 +08:00
parent 44d31d2a14
commit ec89e62661
11 changed files with 327 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from dataclasses import asdict, dataclass, field
@dataclass(frozen=True)
class Finding:
rule_code: str
category: str
severity: str
title: str
detail: str = ""
suggestion: str = ""
evidence: dict[str, object] = field(default_factory=dict)
citations: list[dict[str, object]] = field(default_factory=list)
def to_dict(self) -> dict[str, object]:
return asdict(self)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from review_agent.models import FileSummaryBatch
from review_agent.regulatory_review.schemas import Finding
def run_completeness_check(batch: FileSummaryBatch, rule_set: dict) -> list[Finding]:
items = list(batch.items.order_by("file_index"))
findings: list[Finding] = []
for requirement in rule_set.get("requirements", []):
if requirement.get("type") not in {"required", "conditional", "recommended"}:
continue
matched = [
item
for item in items
if _matches_item(item.file_name, item.relative_path, requirement.get("file_keywords", []))
]
if matched:
continue
findings.append(
Finding(
rule_code=requirement["code"],
category=requirement.get("category", "completeness"),
severity=requirement.get("severity", "medium"),
title=f"缺少{requirement['title']}",
detail=f"当前文件汇总批次未发现{requirement['title']}",
suggestion=requirement.get("suggestion", ""),
evidence={
"requirement_type": requirement.get("type"),
"matched_files": [],
"searched_keywords": requirement.get("file_keywords", []),
},
)
)
return findings
def _matches_item(file_name: str, relative_path: str, keywords: list[str]) -> bool:
haystack = f"{file_name} {relative_path}".lower()
return any(str(keyword).lower() in haystack for keyword in keywords)

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import re
from collections import defaultdict
from review_agent.regulatory_review.schemas import Finding
FIELDS = {
"产品名称": r"产品名称[:]\s*([^\n\r]+)",
"型号规格": r"型号规格[:]\s*([^\n\r]+)",
"预期用途": r"预期用途[:]\s*([^\n\r]+)",
}
def run_consistency_check(document_texts: dict[str, str]) -> list[Finding]:
findings: list[Finding] = []
for label, pattern in FIELDS.items():
values: dict[str, list[str]] = defaultdict(list)
for file_name, text in document_texts.items():
match = re.search(pattern, text)
if match:
values[_normalize(match.group(1))].append(file_name)
if len(values) <= 1:
continue
findings.append(
Finding(
rule_code=f"consistency:{label}",
category="consistency",
severity="high",
title=f"{label}在不同文件中不一致",
detail=f"发现 {len(values)} 个不同的{label}取值。",
suggestion=f"请统一各注册资料中的{label}",
evidence={"field": label, "values": dict(values)},
)
)
return findings
def _normalize(value: str) -> str:
return " ".join(value.strip().split())

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from review_agent.regulatory_review.schemas import Finding
def run_structure_check(document_texts: dict[str, str], rule_set: dict) -> list[Finding]:
findings: list[Finding] = []
for requirement in rule_set.get("requirements", []):
required_sections = requirement.get("required_sections") or []
if not required_sections:
continue
matching_docs = _matching_documents(document_texts, requirement.get("file_keywords", []))
if not matching_docs:
continue
combined_text = "\n".join(matching_docs.values())
for section in required_sections:
if section in combined_text:
continue
findings.append(
Finding(
rule_code=f"{requirement['code']}:{section}",
category="structure",
severity=requirement.get("severity", "medium"),
title=f"{requirement['title']}缺少{section}章节",
detail=f"已匹配{requirement['title']}文件,但未发现{section}相关内容。",
suggestion=requirement.get("suggestion", ""),
evidence={"section": section, "files": list(matching_docs)},
)
)
return findings
def _matching_documents(document_texts: dict[str, str], keywords: list[str]) -> dict[str, str]:
if not keywords:
return document_texts
result = {}
for name, text in document_texts.items():
haystack = f"{name}\n{text}".lower()
if any(str(keyword).lower() in haystack for keyword in keywords):
result[name] = text
return result

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from pathlib import Path
from .rag_index import extract_text_from_path
@dataclass(frozen=True)
class ExtractedText:
path: Path
text: str
status: str
content_hash: str = ""
error_message: str = ""
SUPPORTED_EXTENSIONS = {".txt", ".md", ".pdf", ".docx", ".pptx", ".xlsx", ".doc"}
def extract_text(path: str | Path) -> ExtractedText:
file_path = Path(path)
if file_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
return ExtractedText(path=file_path, text="", status="unsupported")
try:
text = extract_text_from_path(file_path)
except Exception as exc:
return ExtractedText(path=file_path, text="", status="failed", error_message=str(exc))
content_hash = hashlib.sha256(text.encode("utf-8")).hexdigest() if text else ""
return ExtractedText(path=file_path, text=text, status="success", content_hash=content_hash)

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import hashlib
from pathlib import Path
from django.conf import settings
from review_agent.models import RegulatoryArtifact, RegulatoryReviewBatch
def save_artifact(
batch: RegulatoryReviewBatch,
*,
name: str,
content: str | bytes,
artifact_type: str,
metadata: dict | None = None,
) -> RegulatoryArtifact:
root = Path(batch.work_dir) if batch.work_dir else Path(settings.MEDIA_ROOT) / "regulatory_review" / "work" / batch.batch_no
root.mkdir(parents=True, exist_ok=True)
path = root / Path(name).name
if isinstance(content, bytes):
path.write_bytes(content)
digest = hashlib.sha256(content).hexdigest()
else:
path.write_text(content, encoding="utf-8")
digest = hashlib.sha256(content.encode("utf-8")).hexdigest()
return RegulatoryArtifact.objects.create(
batch=batch,
artifact_type=artifact_type,
name=path.name,
storage_path=str(path),
content_hash=digest,
metadata=metadata or {},
)