from __future__ import annotations import json from pathlib import Path from django.conf import settings from openpyxl import Workbook from review_agent.models import ExportedSummaryFile, RegulatoryIssue, RegulatoryReviewBatch SEVERITY_LABELS = { "blocking": "阻断项", "high": "高风险", "medium": "中风险", "low": "低风险", "info": "提示", } def export_review_results(batch: RegulatoryReviewBatch) -> list[ExportedSummaryFile]: root = Path(batch.work_dir) if batch.work_dir else Path(settings.MEDIA_ROOT) / "regulatory_review" / "work" / batch.batch_no export_dir = root / "exports" export_dir.mkdir(parents=True, exist_ok=True) markdown = _create_export( batch, export_dir / f"{batch.batch_no}-regulatory-review.md", ExportedSummaryFile.ExportType.MARKDOWN, "markdown_report", build_markdown_report(batch), ) excel = _create_excel_export(batch, export_dir / f"{batch.batch_no}-regulatory-issues.xlsx") result_json = _create_export( batch, export_dir / f"{batch.batch_no}-regulatory-result.json", ExportedSummaryFile.ExportType.JSON, "result_package", json.dumps(build_result_payload(batch), ensure_ascii=False, indent=2), ) return [markdown, excel, result_json] def build_markdown_report(batch: RegulatoryReviewBatch) -> str: lines = [ "# NMPA 注册资料法规核查报告", "", f"批次号:{batch.batch_no}", ] regenerated_from = (batch.condition_json or {}).get("regenerated_from") if regenerated_from: lines.extend( [ "", "## 复核来源", "", f"- 来源法规核查批次:{regenerated_from.get('batch_no')}", f"- 来源文件汇总批次:{regenerated_from.get('file_summary_batch_no')}", ] ) lines.extend(["", "## 风险汇总", "", "| 风险等级 | 数量 |", "| --- | --- |"]) summary = batch.risk_summary or {} for severity, label in SEVERITY_LABELS.items(): lines.append(f"| {label} | {summary.get(severity, 0)} |") lines.extend(["", "## 问题清单", "", "| 等级 | 问题 | 状态 | 建议 |", "| --- | --- | --- | --- |"]) for issue in batch.issues.order_by("id"): lines.append( f"| {SEVERITY_LABELS.get(issue.severity, issue.severity)} | {issue.title} | {issue.status} | {issue.suggestion or '-'} |" ) review_records = _review_records(batch) if review_records: lines.extend(["", "## 复核记录", "", "| 补充批次 | 问题数 | 通过数 | 未通过数 |", "| --- | --- | --- | --- |"]) for record in review_records: items = record.get("items", []) passed = sum(1 for item in items if item.get("status") == RegulatoryIssue.Status.REVIEW_PASSED) failed = sum(1 for item in items if item.get("status") == RegulatoryIssue.Status.REVIEW_FAILED) lines.append(f"| {record.get('file_summary_batch_no')} | {len(items)} | {passed} | {failed} |") return "\n".join(lines) def build_result_payload(batch: RegulatoryReviewBatch) -> dict[str, object]: return { "batch_no": batch.batch_no, "source_summary_batch": batch.source_summary_batch.batch_no, "regenerated_from": (batch.condition_json or {}).get("regenerated_from"), "risk_summary": batch.risk_summary, "issues": [ { "severity": issue.severity, "category": issue.category, "rule_code": issue.rule_code, "title": issue.title, "detail": issue.detail, "suggestion": issue.suggestion, "status": issue.status, "evidence": issue.evidence, "citations": issue.citations, } for issue in batch.issues.order_by("id") ], "review_records": _review_records(batch), } def build_assistant_summary(batch: RegulatoryReviewBatch, exports: list[ExportedSummaryFile]) -> str: export_by_type = {export.export_type: export for export in exports} lines = [ "已完成 NMPA 注册资料法规核查。", "", "| 风险等级 | 数量 |", "| --- | --- |", ] summary = batch.risk_summary or {} for severity, label in SEVERITY_LABELS.items(): if summary.get(severity, 0): lines.append(f"| {label} | {summary[severity]} |") lines.extend(["", "| 等级 | 问题 | 状态 | 建议 |", "| --- | --- | --- | --- |"]) for issue in batch.issues.order_by("id")[:8]: lines.append( f"| {SEVERITY_LABELS.get(issue.severity, issue.severity)} | {issue.title} | {issue.status} | {issue.suggestion or '-'} |" ) lines.extend( [ "", _download_link("下载 Markdown 核查报告", export_by_type.get(ExportedSummaryFile.ExportType.MARKDOWN)), _download_link("下载 Excel 缺失清单", export_by_type.get(ExportedSummaryFile.ExportType.EXCEL)), _download_link("下载 JSON 结果包", export_by_type.get(ExportedSummaryFile.ExportType.JSON)), ] ) return "\n".join(line for line in lines if line is not None) def _download_link(label: str, exported: ExportedSummaryFile | None) -> str | None: if not exported: return None return f"[{label}](/api/review-agent/file-summary/exports/{exported.pk}/download/)" def _create_export( batch: RegulatoryReviewBatch, path: Path, export_type: str, category: str, content: str, ) -> ExportedSummaryFile: path.write_text(content, encoding="utf-8") return ExportedSummaryFile.objects.create( batch=batch.source_summary_batch, workflow_type="regulatory_review", workflow_batch_id=batch.pk, export_category=category, export_type=export_type, file_name=path.name, storage_path=str(path), ) def _create_excel_export(batch: RegulatoryReviewBatch, path: Path) -> ExportedSummaryFile: workbook = Workbook() sheet = workbook.active sheet.title = "法规问题清单" sheet.append(["等级", "类别", "规则", "问题", "状态", "建议", "法规依据"]) for issue in batch.issues.order_by("id"): sheet.append( [ SEVERITY_LABELS.get(issue.severity, issue.severity), issue.category, issue.rule_code, issue.title, issue.status, issue.suggestion, "; ".join(str(item.get("source", "")) for item in issue.citations), ] ) workbook.save(path) return ExportedSummaryFile.objects.create( batch=batch.source_summary_batch, workflow_type="regulatory_review", workflow_batch_id=batch.pk, export_category="issue_checklist", export_type=ExportedSummaryFile.ExportType.EXCEL, file_name=path.name, storage_path=str(path), ) def _review_records(batch: RegulatoryReviewBatch) -> list[dict[str, object]]: records = [] for artifact in batch.artifacts.filter(metadata__artifact="review_record").order_by("created_at", "id"): try: records.append(json.loads(Path(artifact.storage_path).read_text(encoding="utf-8"))) except (OSError, json.JSONDecodeError): continue return records