Files

226 lines
8.6 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from django.conf import settings
from openpyxl import Workbook
from review_agent.models import ExportedSummaryFile, RegulatoryIssue, RegulatoryReviewBatch
SEVERITY_LABELS = {
"blocking": "阻断项",
"high": "高风险",
"medium": "中风险",
"low": "低风险",
"info": "提示",
}
def export_review_results(batch: RegulatoryReviewBatch) -> list[ExportedSummaryFile]:
root = Path(batch.work_dir) if batch.work_dir else Path(settings.MEDIA_ROOT) / "regulatory_review" / "work" / batch.batch_no
export_dir = root / "exports"
export_dir.mkdir(parents=True, exist_ok=True)
markdown = _create_export(
batch,
export_dir / f"{batch.batch_no}-regulatory-review.md",
ExportedSummaryFile.ExportType.MARKDOWN,
"markdown_report",
build_markdown_report(batch),
)
excel = _create_excel_export(batch, export_dir / f"{batch.batch_no}-regulatory-issues.xlsx")
result_json = _create_export(
batch,
export_dir / f"{batch.batch_no}-regulatory-result.json",
ExportedSummaryFile.ExportType.JSON,
"result_package",
json.dumps(build_result_payload(batch), ensure_ascii=False, indent=2),
)
return [markdown, excel, result_json]
def build_markdown_report(batch: RegulatoryReviewBatch) -> str:
lines = [
"# NMPA 注册资料法规核查报告",
"",
f"批次号:{batch.batch_no}",
]
regenerated_from = (batch.condition_json or {}).get("regenerated_from")
if regenerated_from:
lines.extend(
[
"",
"## 复核来源",
"",
f"- 来源法规核查批次:{regenerated_from.get('batch_no')}",
f"- 来源文件汇总批次:{regenerated_from.get('file_summary_batch_no')}",
]
)
lines.extend(["", "## 风险汇总", "", "| 风险等级 | 数量 |", "| --- | --- |"])
summary = batch.risk_summary or {}
for severity, label in SEVERITY_LABELS.items():
lines.append(f"| {label} | {summary.get(severity, 0)} |")
lines.extend(["", "## 问题清单", "", "| 等级 | 问题 | 状态 | 建议 |", "| --- | --- | --- | --- |"])
for issue in batch.issues.order_by("id"):
lines.append(
f"| {SEVERITY_LABELS.get(issue.severity, issue.severity)} | {issue.title} | {issue.status} | {issue.suggestion or '-'} |"
)
review_records = _review_records(batch)
if review_records:
lines.extend(["", "## 复核记录", "", "| 补充批次 | 问题数 | 通过数 | 未通过数 |", "| --- | --- | --- | --- |"])
for record in review_records:
items = record.get("items", [])
passed = sum(1 for item in items if item.get("status") == RegulatoryIssue.Status.REVIEW_PASSED)
failed = sum(1 for item in items if item.get("status") == RegulatoryIssue.Status.REVIEW_FAILED)
lines.append(f"| {record.get('file_summary_batch_no')} | {len(items)} | {passed} | {failed} |")
notifications = _notification_records(batch)
if notifications:
lines.extend(["", "## 通知记录", "", "| 渠道 | 对象 | 状态 | 问题 |", "| --- | --- | --- | --- |"])
for record in notifications:
lines.append(
f"| {record['channel']} | {record['target'] or '-'} | {record['status']} | {record['payload'].get('title', '-')} |"
)
return "\n".join(lines)
def build_result_payload(batch: RegulatoryReviewBatch) -> dict[str, object]:
return {
"batch_no": batch.batch_no,
"source_summary_batch": batch.source_summary_batch.batch_no,
"regenerated_from": (batch.condition_json or {}).get("regenerated_from"),
"risk_summary": batch.risk_summary,
"issues": [
{
"severity": issue.severity,
"category": issue.category,
"rule_code": issue.rule_code,
"title": issue.title,
"detail": issue.detail,
"suggestion": issue.suggestion,
"status": issue.status,
"evidence": issue.evidence,
"citations": issue.citations,
}
for issue in batch.issues.order_by("id")
],
"review_records": _review_records(batch),
"notifications": _notification_records(batch),
}
def build_assistant_summary(batch: RegulatoryReviewBatch, exports: list[ExportedSummaryFile]) -> str:
export_by_type = {export.export_type: export for export in exports}
lines = [
"已完成 NMPA 注册资料法规核查。",
"",
"| 风险等级 | 数量 |",
"| --- | --- |",
]
summary = batch.risk_summary or {}
for severity, label in SEVERITY_LABELS.items():
if summary.get(severity, 0):
lines.append(f"| {label} | {summary[severity]} |")
lines.extend(["", "| 等级 | 问题 | 状态 | 建议 |", "| --- | --- | --- | --- |"])
for issue in batch.issues.order_by("id")[:8]:
lines.append(
f"| {SEVERITY_LABELS.get(issue.severity, issue.severity)} | {issue.title} | {issue.status} | {issue.suggestion or '-'} |"
)
lines.extend(
[
"",
_download_link("下载 Markdown 核查报告", export_by_type.get(ExportedSummaryFile.ExportType.MARKDOWN)),
_download_link("下载 Excel 缺失清单", export_by_type.get(ExportedSummaryFile.ExportType.EXCEL)),
_download_link("下载 JSON 结果包", export_by_type.get(ExportedSummaryFile.ExportType.JSON)),
]
)
return "\n".join(line for line in lines if line is not None)
def _download_link(label: str, exported: ExportedSummaryFile | None) -> str | None:
if not exported:
return None
return f"[{label}](/api/review-agent/file-summary/exports/{exported.pk}/download/)"
def _create_export(
batch: RegulatoryReviewBatch,
path: Path,
export_type: str,
category: str,
content: str,
) -> ExportedSummaryFile:
path.write_text(content, encoding="utf-8")
return ExportedSummaryFile.objects.create(
batch=batch.source_summary_batch,
workflow_type="regulatory_review",
workflow_batch_id=batch.pk,
export_category=category,
export_type=export_type,
file_name=path.name,
storage_path=str(path),
)
def _create_excel_export(batch: RegulatoryReviewBatch, path: Path) -> ExportedSummaryFile:
workbook = Workbook()
sheet = workbook.active
sheet.title = "法规问题清单"
sheet.append(["等级", "类别", "规则", "问题", "状态", "建议", "法规依据", "通知记录"])
for issue in batch.issues.order_by("id"):
sheet.append(
[
SEVERITY_LABELS.get(issue.severity, issue.severity),
issue.category,
issue.rule_code,
issue.title,
issue.status,
issue.suggestion,
"; ".join(str(item.get("source", "")) for item in issue.citations),
_notification_summary_for_issue(batch, issue.pk),
]
)
workbook.save(path)
return ExportedSummaryFile.objects.create(
batch=batch.source_summary_batch,
workflow_type="regulatory_review",
workflow_batch_id=batch.pk,
export_category="issue_checklist",
export_type=ExportedSummaryFile.ExportType.EXCEL,
file_name=path.name,
storage_path=str(path),
)
def _review_records(batch: RegulatoryReviewBatch) -> list[dict[str, object]]:
records = []
for artifact in batch.artifacts.filter(metadata__artifact="review_record").order_by("created_at", "id"):
try:
records.append(json.loads(Path(artifact.storage_path).read_text(encoding="utf-8")))
except (OSError, json.JSONDecodeError):
continue
return records
def _notification_records(batch: RegulatoryReviewBatch) -> list[dict[str, object]]:
return [
{
"channel": record.channel,
"target": record.target,
"status": record.status,
"payload": record.payload,
"sent_at": record.sent_at.isoformat() if record.sent_at else "",
}
for record in batch.notifications.order_by("created_at", "id")
]
def _notification_summary_for_issue(batch: RegulatoryReviewBatch, issue_id: int) -> str:
records = [
record
for record in batch.notifications.all()
if isinstance(record.payload, dict) and record.payload.get("issue_id") == issue_id
]
return "; ".join(f"{record.channel}:{record.status}" for record in records)