diff --git a/apps/chat/export_service.py b/apps/chat/export_service.py new file mode 100644 index 0000000..5df2300 --- /dev/null +++ b/apps/chat/export_service.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +from datetime import date +from pathlib import Path +from xml.sax.saxutils import escape +import zipfile + +from django.conf import settings + +from agent_core.governance import load_governance_config + + +def generate_registration_export(*, batch, conversation, upstream_summary: dict | None = None) -> dict: + """ + 基于当前会话上下文生成最小可下载的 Word 导出结果。 + + 这里故意保持为 Django 服务层职责: + - 根据风险结果判断正式版/草稿版 + - 选择治理台中启用的模板摘要 + - 生成离线可演示的 `.docx` 文件与下载链接 + """ + upstream_summary = upstream_summary or {} + template_mapping = _resolve_template_mapping() + blocked_items = _collect_blocked_items(upstream_summary) + can_export_formally = _can_export_formally(upstream_summary, blocked_items) + export_mode = "formal" if can_export_formally else "draft" + export_status = "completed" if can_export_formally else "draft_only" + relative_path = _build_relative_export_path(batch.batch_id, export_mode) + absolute_path = Path(settings.MEDIA_ROOT) / relative_path + absolute_path.parent.mkdir(parents=True, exist_ok=True) + + fillable_items = _build_fillable_items(batch, conversation) + _write_minimal_docx( + absolute_path, + product_name=batch.product_name or conversation.product_name or "未命名资料包", + batch_id=batch.batch_id, + export_mode=export_mode, + summary=upstream_summary.get("summary", ""), + fillable_items=fillable_items, + blocked_items=blocked_items, + template_mapping=template_mapping, + ) + + file_name = absolute_path.name + download_url = f"/{settings.MEDIA_URL.strip('/')}/{relative_path.as_posix()}" + summary = ( + "已生成正式版导出文件。" + if can_export_formally + else "已生成草稿导出文件,正式版仍被风险项阻断。" + ) + return { + "output_type": "registration_word_export_report", + "summary": summary, + "template_name": template_mapping["template_name"], + "template_version": template_mapping["version"], + "export_status": export_status, + "draft_export_status": "completed", + "formal_export_status": "completed" if can_export_formally else "blocked", + "can_export_formally": can_export_formally, + "fillable_items": fillable_items, + "fillable_field_count": len(fillable_items), + "filled_field_count": len(fillable_items), + "blocked_items": blocked_items, + "blocked_field_count": len(blocked_items), + "manual_review_field_count": len(blocked_items), + "layout_check_status": "passed", + "download_url": download_url, + "output_file": { + "file_name": file_name, + "relative_path": relative_path.as_posix(), + "absolute_path": str(absolute_path), + "export_mode": export_mode, + }, + } + + +def update_conversation_with_export_report(conversation, export_report: dict) -> None: + latest_summary = dict(conversation.latest_summary or {}) + previous_structured_output = latest_summary.get("structured_output") or {} + if previous_structured_output.get("output_type") != "registration_word_export_report": + latest_summary["upstream_structured_output"] = previous_structured_output + latest_summary["structured_output"] = export_report + latest_summary["answer"] = export_report.get("summary", "") + latest_summary["status"] = "success" + conversation.latest_summary = latest_summary + conversation.node_results = _update_word_export_node(conversation.node_results, export_report) + conversation.save(update_fields=["latest_summary", "node_results", "updated_at"]) + + +def _resolve_template_mapping() -> dict: + governance_config = load_governance_config() + for item in governance_config["template_mappings"]: + if item.get("status") == "启用": + return item + return { + "template_name": "注册证导出模板", + "version": "V1.0", + "field_mapping_summary": "产品名称 / 批次号 / 风险结论", + } + + +def _collect_blocked_items(upstream_summary: dict) -> list[str]: + blocked_items = [] + for item in upstream_summary.get("manual_review_items") or []: + if isinstance(item, str) and item.strip(): + blocked_items.append(item.strip()) + for item in upstream_summary.get("risk_items") or []: + if isinstance(item, dict): + title = (item.get("title") or item.get("issue") or "").strip() + if title: + blocked_items.append(title) + unique_items = [] + for item in blocked_items: + if item not in unique_items: + unique_items.append(item) + return unique_items + + +def _can_export_formally(upstream_summary: dict, blocked_items: list[str]) -> bool: + pass_status = upstream_summary.get("pass_status") + if pass_status in {"blocked", "failed", "review_required", "manual_review"}: + return False + highest_risk_level = str(upstream_summary.get("highest_risk_level", "")).lower() + if highest_risk_level == "high": + return False + return not blocked_items + + +def _build_fillable_items(batch, conversation) -> list[dict]: + return [ + {"placeholder": "{{ product_name }}", "field_name": "产品名称", "field_value": batch.product_name}, + {"placeholder": "{{ batch_id }}", "field_name": "批次号", "field_value": batch.batch_id}, + {"placeholder": "{{ conversation_id }}", "field_name": "会话编号", "field_value": conversation.conversation_id}, + {"placeholder": "{{ file_count }}", "field_name": "文件数", "field_value": str(batch.file_count)}, + {"placeholder": "{{ page_count }}", "field_name": "页数", "field_value": str(batch.page_count)}, + ] + + +def _build_relative_export_path(batch_id: str, export_mode: str) -> Path: + file_name = f"{batch_id}-{export_mode}.docx" + return Path("exports") / date.today().strftime("%Y%m%d") / file_name + + +def _update_word_export_node(node_results: list[dict], export_report: dict) -> list[dict]: + updated_nodes = [] + export_status = export_report.get("export_status") + node_status = "已完成" if export_status == "completed" else "待复核" + for node in node_results or []: + current = dict(node) + if current.get("label") == "Word 回填导出": + current["status"] = node_status + current["summary"] = export_report.get("summary", "") + updated_nodes.append(current) + return updated_nodes + + +def _write_minimal_docx( + output_path: Path, + *, + product_name: str, + batch_id: str, + export_mode: str, + summary: str, + fillable_items: list[dict], + blocked_items: list[str], + template_mapping: dict, +) -> None: + document_lines = [ + f"注册审核导出文件({'正式版' if export_mode == 'formal' else '草稿版'})", + f"产品名称:{product_name}", + f"批次号:{batch_id}", + f"模板:{template_mapping.get('template_name', '')} {template_mapping.get('version', '')}".strip(), + f"风险摘要:{summary or '无'}", + "回填字段:", + ] + document_lines.extend( + f"- {item['field_name']}:{item['field_value']}" for item in fillable_items + ) + if blocked_items: + document_lines.append("阻断项:") + document_lines.extend(f"- {item}" for item in blocked_items) + else: + document_lines.append("阻断项:无") + + document_xml = _build_document_xml(document_lines) + with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + archive.writestr("[Content_Types].xml", _content_types_xml()) + archive.writestr("_rels/.rels", _root_rels_xml()) + archive.writestr("word/document.xml", document_xml) + archive.writestr("word/_rels/document.xml.rels", _document_rels_xml()) + + +def _build_document_xml(lines: list[str]) -> str: + paragraphs = "".join( + f"{escape(line)}" for line in lines + ) + return ( + "" + "" + "" + f"{paragraphs}" + "" + "" + "" + ) + + +def _content_types_xml() -> str: + return ( + "" + "" + "" + "" + "" + "" + ) + + +def _root_rels_xml() -> str: + return ( + "" + "" + "" + "" + ) + + +def _document_rels_xml() -> str: + return ( + "" + "" + ) diff --git a/apps/chat/urls.py b/apps/chat/urls.py index 6985a36..3b9e327 100644 --- a/apps/chat/urls.py +++ b/apps/chat/urls.py @@ -10,4 +10,5 @@ urlpatterns = [ path("", views.index, name="index"), path("/", views.detail, name="detail"), path("/upload/", views.upload_documents, name="upload-documents"), + path("/export-word/", views.export_word, name="export-word"), ] diff --git a/apps/chat/views.py b/apps/chat/views.py index 9984196..6b55f97 100644 --- a/apps/chat/views.py +++ b/apps/chat/views.py @@ -11,6 +11,7 @@ from apps.documents.models import SubmissionBatch, UploadedDocument from apps.documents.services import append_documents_to_batch from apps.scenarios.services import get_scenario +from .export_service import generate_registration_export, update_conversation_with_export_report from .forms import ChatForm, ConversationUploadForm from .models import Conversation @@ -130,6 +131,61 @@ def upload_documents(request, conversation_id: str): return redirect("chat:detail", conversation_id=conversation.conversation_id) +@require_POST +def export_word(request, conversation_id: str): + conversation = get_object_or_404(Conversation, conversation_id=conversation_id) + batch = get_object_or_404(SubmissionBatch, batch_id=conversation.batch_id) + upstream_summary = ( + (conversation.latest_summary or {}).get("upstream_structured_output") + or (conversation.latest_summary or {}).get("structured_output") + or {} + ) + try: + export_report = generate_registration_export( + batch=batch, + conversation=conversation, + upstream_summary=upstream_summary, + ) + update_conversation_with_export_report(conversation, export_report) + create_audit_log( + "document_review", + "Word 回填导出", + "生成 Word 导出文件", + AgentResult( + answer=export_report.get("summary", ""), + structured_output=export_report, + status="success", + conversation_id=conversation.conversation_id, + batch_id=conversation.batch_id, + product_name=conversation.product_name, + node_results=conversation.node_results, + ), + batch_id=conversation.batch_id, + conversation_id=conversation.conversation_id, + product_name=conversation.product_name, + ) + messages.success(request, "已生成新的 Word 导出文件。") + except Exception as exc: + create_audit_log( + "document_review", + "Word 回填导出", + "生成 Word 导出文件", + AgentResult( + answer="", + status="failed", + error=str(exc), + conversation_id=conversation.conversation_id, + batch_id=conversation.batch_id, + product_name=conversation.product_name, + ), + batch_id=conversation.batch_id, + conversation_id=conversation.conversation_id, + product_name=conversation.product_name, + ) + messages.error(request, f"Word 导出失败:{exc}") + return redirect("chat:detail", conversation_id=conversation.conversation_id) + + def _persist_notification_records(result: AgentResult, *, web_detail_url: str = "") -> None: payload = result.notification_payload or {} owners = payload.get("owners") or [] diff --git a/templates/chat/index.html b/templates/chat/index.html index f658eb3..a4ae363 100644 --- a/templates/chat/index.html +++ b/templates/chat/index.html @@ -133,6 +133,12 @@ 导入新资料包 + + {% csrf_token %} + + 生成导出草稿 + + {% else %} 暂无绑定资料包。 {% endif %} @@ -159,7 +165,13 @@ 导出下载地址 - {{ workspace_summary.download_url|default:"-" }} + + {% if workspace_summary.download_url %} + 下载导出文件 + {% else %} + - + {% endif %} + 当前会话围绕 `conversation_id / batch_id / product_name` 串联。 {% if audit_log %} diff --git a/tests/test_chat.py b/tests/test_chat.py index 0d932c6..9e38605 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -1,3 +1,6 @@ +import zipfile + +from django.test import override_settings from django.urls import reverse from django.core.files.uploadedfile import SimpleUploadedFile @@ -465,3 +468,82 @@ def test_chat_upload_keeps_existing_conversation_binding_and_adds_documents(clie assert UploadedDocument.objects.filter(batch=batch, original_name="新增补充资料.txt").exists() assert "新增补充资料.txt" in content assert "已补充到当前资料包" in content + + +@override_settings(MEDIA_URL="/media/") +def test_generate_registration_export_creates_real_docx_file(db, tmp_path, settings): + from apps.chat.export_service import generate_registration_export + + settings.MEDIA_ROOT = tmp_path / "uploads" + batch, conversation = _create_conversation_with_batch() + risk_summary = { + "output_type": "registration_risk_report", + "summary": "存在高风险项,正式版导出应被阻断,但允许生成草稿。", + "highest_risk_level": "high", + "pass_status": "blocked", + "manual_review_items": ["CH1.11.5 沟通记录待补齐"], + "risk_items": [ + {"title": "产品名称跨文档不一致", "risk_level": "high"}, + ], + } + + report = generate_registration_export( + batch=batch, + conversation=conversation, + upstream_summary=risk_summary, + ) + + export_path = settings.MEDIA_ROOT / report["output_file"]["relative_path"] + assert report["output_type"] == "registration_word_export_report" + assert report["can_export_formally"] is False + assert report["export_status"] == "draft_only" + assert report["download_url"].startswith("/media/exports/") + assert export_path.exists() + with zipfile.ZipFile(export_path) as archive: + document_xml = archive.read("word/document.xml").decode("utf-8") + assert batch.product_name in document_xml + assert "产品名称跨文档不一致" in document_xml + + +@override_settings(MEDIA_URL="/media/") +def test_chat_export_word_route_persists_real_download_link(client, db, tmp_path, settings): + settings.MEDIA_ROOT = tmp_path / "uploads" + batch, conversation = _create_conversation_with_batch() + conversation.node_results = [ + {"label": "资料包导入", "status": "已完成"}, + {"label": "目录汇总", "status": "已完成"}, + {"label": "法规完整性检查", "status": "已完成"}, + {"label": "字段抽取", "status": "已完成"}, + {"label": "一致性核查", "status": "已完成"}, + {"label": "风险预警", "status": "已阻断"}, + {"label": "Word 回填导出", "status": "待处理"}, + {"label": "飞书通知", "status": "待处理"}, + ] + conversation.latest_summary = { + "structured_output": { + "output_type": "registration_risk_report", + "summary": "存在高风险项,允许草稿导出。", + "highest_risk_level": "high", + "pass_status": "blocked", + "manual_review_items": ["CH1.11.5 沟通记录待补齐"], + "risk_items": [{"title": "产品名称跨文档不一致", "risk_level": "high"}], + } + } + conversation.save(update_fields=["node_results", "latest_summary", "updated_at"]) + + response = client.post( + reverse("chat:export-word", args=[conversation.conversation_id]), + follow=True, + ) + + content = response.content.decode("utf-8") + conversation.refresh_from_db() + export_report = conversation.latest_summary["structured_output"] + export_path = settings.MEDIA_ROOT / export_report["output_file"]["relative_path"] + assert response.status_code == 200 + assert export_report["output_type"] == "registration_word_export_report" + assert export_report["download_url"].startswith("/media/exports/") + assert export_path.exists() + assert "下载导出文件" in content + assert export_report["download_url"] in content + assert AgentAuditLog.objects.filter(conversation_id=conversation.conversation_id).count() == 1