import zipfile from django.test import override_settings from django.urls import reverse from django.core.files.uploadedfile import SimpleUploadedFile from agent_core.results import AgentResult from apps.audit.models import AgentAuditLog from apps.audit.models import NotificationRecord from apps.chat.models import Conversation from apps.chat.services import create_conversation_for_batch from apps.documents.models import ExportedDocument, SubmissionBatch, UploadedDocument def _create_conversation_with_batch(): batch = SubmissionBatch.objects.create( batch_id="SUB-20260604-001", product_name="新型冠状病毒 2019-nCoV 核酸检测试剂盒", workflow_type="registration", conversation_id="conv-001", file_count=2, page_count=12, import_status="completed", ) conversation = Conversation.objects.create( conversation_id="conv-001", title="新型冠状病毒 2019-nCoV 核酸检测试剂盒", product_name=batch.product_name, batch_id=batch.batch_id, task_status="processing", node_results=[ {"label": "资料包导入", "status": "已完成"}, {"label": "目录汇总", "status": "处理中"}, ], ) return batch, conversation def test_chat_post_returns_agent_result_and_audit_log(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) monkeypatch.setattr( "apps.chat.views.run_agent", lambda *args, **kwargs: AgentResult(answer="模拟回答", status="success"), ) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "如何处理异常?"}, ) assert response.status_code == 200 content = response.content.decode("utf-8") assert "审核智能体" in content assert "模拟回答" in content assert AgentAuditLog.objects.count() == 1 assert AgentAuditLog.objects.get().batch_id == batch.batch_id def test_chat_rejects_empty_message(client, db): _batch, conversation = _create_conversation_with_batch() response = client.post(reverse("chat:detail", args=[conversation.conversation_id]), {"message": ""}) assert response.status_code == 200 assert AgentAuditLog.objects.count() == 0 assert "请输入要咨询的问题" in response.content.decode("utf-8") def test_chat_passes_selected_document_ids_to_agent_core(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() selected = UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="selected.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="other.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) captured = {} def fake_run_agent(scenario_config, user_input, options=None): captured["options"] = options or {} return AgentResult(answer="ok", status="success") monkeypatch.setattr("apps.chat.views.run_agent", fake_run_agent) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "只查选中文档", "document_ids": [str(selected.id)]}, ) assert response.status_code == 200 assert captured["options"]["document_ids"] == [selected.id] assert captured["options"]["conversation_id"] == conversation.conversation_id assert captured["options"]["batch_id"] == batch.batch_id def test_chat_renders_three_column_workspace_and_node_results(client, db): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) response = client.get(reverse("chat:detail", args=[conversation.conversation_id])) content = response.content.decode("utf-8") assert response.status_code == 200 assert "会话历史" in content assert "对话区与节点导航" in content assert "上传区" in content assert "资料包导入 / 已完成" in content assert "目录汇总 / 处理中" in content def test_chat_execution_creates_notification_record_from_agent_result(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) monkeypatch.setattr( "apps.chat.views.run_agent", lambda *args, **kwargs: AgentResult( answer="执行完成", status="success", notification_payload={ "batch_id": batch.batch_id, "conversation_id": conversation.conversation_id, "product_name": batch.product_name, "notify_reason": "task_completed", "owners": [ { "owner_role": "注册资料负责人", "feishu_user_id": "ou_demo_1", } ], }, ), ) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "执行审核"}, ) assert response.status_code == 200 record = NotificationRecord.objects.get() assert record.notify_reason == "task_completed" assert record.batch_id == batch.batch_id assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/") def test_chat_execution_uses_notification_payload_message_status_and_receipt(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) monkeypatch.setattr( "apps.chat.views.run_agent", lambda *args, **kwargs: AgentResult( answer="通知已发送", status="success", notification_payload={ "batch_id": batch.batch_id, "conversation_id": conversation.conversation_id, "product_name": batch.product_name, "notify_reason": "task_completed", "message_status": "sent", "web_detail_url": "https://example.com/audit/custom", "receipt": {"message_id": "msg-custom", "status": "sent"}, "owners": [ { "owner_role": "注册资料负责人", "feishu_user_id": "ou_demo_1", } ], }, ), ) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "执行通知任务"}, ) assert response.status_code == 200 record = NotificationRecord.objects.get() assert record.message_status == "sent" assert record.receipt["message_id"] == "msg-custom" def test_chat_execution_creates_failed_notification_record_and_updates_conversation(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) monkeypatch.setattr( "apps.chat.views.run_agent", lambda *args, **kwargs: AgentResult( answer="执行失败", status="failed", error="规则执行失败", node_results=[ {"code": "package_import", "label": "资料包导入", "status": "已完成"}, {"code": "overview", "label": "目录汇总", "status": "已完成"}, {"code": "risk", "label": "风险预警", "status": "已阻断"}, {"code": "feishu_notify", "label": "飞书通知", "status": "失败"}, ], notification_payload={ "batch_id": batch.batch_id, "conversation_id": conversation.conversation_id, "product_name": batch.product_name, "notify_reason": "task_failed", "owners": [ { "owner_role": "注册申报负责人", "feishu_user_id": "ou_demo_2", } ], }, ), ) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "执行失败任务"}, ) assert response.status_code == 200 record = NotificationRecord.objects.get() conversation.refresh_from_db() assert record.notify_reason == "task_failed" assert record.message_status == "failed" assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/") assert conversation.task_status == "failed" assert conversation.node_results[-1]["label"] == "飞书通知" def test_chat_execution_persists_agent_node_results_to_conversation(client, db, monkeypatch): batch, conversation = _create_conversation_with_batch() UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) monkeypatch.setattr( "apps.chat.views.run_agent", lambda *args, **kwargs: AgentResult( answer="已生成风险结论", status="success", node_results=[ {"code": "package_import", "label": "资料包导入", "status": "已完成"}, {"code": "overview", "label": "目录汇总", "status": "已完成"}, {"code": "completeness", "label": "法规完整性检查", "status": "已完成"}, {"code": "field_extraction", "label": "字段抽取", "status": "已完成"}, {"code": "consistency", "label": "一致性核查", "status": "待复核"}, {"code": "risk", "label": "风险预警", "status": "已阻断", "summary": "存在高风险"}, {"code": "word_export", "label": "Word 回填导出", "status": "待处理"}, {"code": "feishu_notify", "label": "飞书通知", "status": "待处理"}, ], notification_payload={ "batch_id": batch.batch_id, "conversation_id": conversation.conversation_id, "product_name": batch.product_name, "notify_reason": "task_completed", "owners": [], }, ), ) response = client.post( reverse("chat:detail", args=[conversation.conversation_id]), {"message": "执行节点任务"}, ) assert response.status_code == 200 conversation.refresh_from_db() assert len(conversation.node_results) == 8 assert conversation.task_status == "success" assert conversation.latest_summary["answer"] == "已生成风险结论" def test_create_conversation_for_batch_initializes_eight_workflow_nodes(db): conversation = create_conversation_for_batch( "SUB-20260604-001", "新型冠状病毒 2019-nCoV 核酸检测试剂盒", ) labels = [node["label"] for node in conversation.node_results] assert len(labels) == 8 assert labels == [ "资料包导入", "目录汇总", "法规完整性检查", "字段抽取", "一致性核查", "风险预警", "Word 回填导出", "飞书通知", ] def test_chat_page_shows_upload_entry_and_dynamic_context_cards(client, db): batch, conversation = _create_conversation_with_batch() conversation.node_results = [ {"label": "资料包导入", "status": "已完成"}, {"label": "目录汇总", "status": "已完成"}, {"label": "法规完整性检查", "status": "已完成"}, {"label": "字段抽取", "status": "已完成"}, {"label": "一致性核查", "status": "待复核"}, {"label": "风险预警", "status": "已阻断"}, {"label": "Word 回填导出", "status": "待处理"}, {"label": "飞书通知", "status": "待处理"}, ] conversation.save(update_fields=["node_results"]) UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) response = client.get(reverse("chat:detail", args=[conversation.conversation_id])) content = response.content.decode("utf-8") assert response.status_code == 200 assert "继续上传资料" in content assert "最高风险等级" in content assert "是否允许正式导出" in content assert "通知状态" in content assert "飞书通知 / 待处理" in content def test_chat_page_blocks_formal_export_when_word_export_node_is_blocked(client, db): batch, conversation = _create_conversation_with_batch() conversation.node_results = [ {"label": "资料包导入", "status": "已完成"}, {"label": "目录汇总", "status": "已完成"}, {"label": "法规完整性检查", "status": "已完成"}, {"label": "字段抽取", "status": "已完成"}, {"label": "一致性核查", "status": "已完成"}, {"label": "风险预警", "status": "已完成"}, {"label": "Word 回填导出", "status": "已阻断"}, {"label": "飞书通知", "status": "待处理"}, ] conversation.latest_summary = { "structured_output": { "download_url": "/downloads/export.docx", } } conversation.save(update_fields=["node_results", "latest_summary", "updated_at"]) response = client.get(reverse("chat:detail", args=[conversation.conversation_id])) content = response.content.decode("utf-8") assert response.status_code == 200 assert "是否允许正式导出" in content assert ">否<" in content assert "/downloads/export.docx" in content def test_chat_page_uses_structured_formal_export_flag_when_node_status_is_completed(client, db): batch, conversation = _create_conversation_with_batch() conversation.node_results = [ {"label": "资料包导入", "status": "已完成"}, {"label": "目录汇总", "status": "已完成"}, {"label": "法规完整性检查", "status": "已完成"}, {"label": "字段抽取", "status": "已完成"}, {"label": "一致性核查", "status": "已完成"}, {"label": "风险预警", "status": "已完成"}, {"label": "Word 回填导出", "status": "已完成"}, {"label": "飞书通知", "status": "待处理"}, ] conversation.latest_summary = { "structured_output": { "can_export_formally": False, "download_url": "/downloads/review-only.docx", } } conversation.save(update_fields=["node_results", "latest_summary", "updated_at"]) response = client.get(reverse("chat:detail", args=[conversation.conversation_id])) content = response.content.decode("utf-8") assert response.status_code == 200 assert "是否允许正式导出" in content assert ">否<" in content assert "/downloads/review-only.docx" in content def test_chat_upload_keeps_existing_conversation_binding_and_adds_documents(client, db): batch, conversation = _create_conversation_with_batch() existing_document = UploadedDocument.objects.create( batch=batch, scenario_id="document_review", original_name="原说明书.md", file_type="md", size=1, status=UploadedDocument.STATUS_INDEXED, ) existing_count = UploadedDocument.objects.filter(batch=batch).count() upload_file = SimpleUploadedFile( "新增补充资料.txt", "产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"), content_type="text/plain", ) response = client.post( reverse("chat:upload-documents", args=[conversation.conversation_id]), {"files": [upload_file]}, follow=True, ) content = response.content.decode("utf-8") batch.refresh_from_db() conversation.refresh_from_db() assert response.status_code == 200 assert SubmissionBatch.objects.count() == 1 assert Conversation.objects.count() == 1 assert conversation.conversation_id == "conv-001" assert batch.conversation_id == conversation.conversation_id assert UploadedDocument.objects.filter(batch=batch).count() == existing_count + 1 assert UploadedDocument.objects.filter(batch=batch, original_name="新增补充资料.txt").exists() assert "新增补充资料.txt" in content assert "已补充到当前资料包" in content @override_settings(MEDIA_URL="/media/") def test_generate_registration_export_creates_real_docx_file(db, tmp_path, settings): from apps.chat.export_service import generate_registration_export settings.MEDIA_ROOT = tmp_path / "uploads" batch, conversation = _create_conversation_with_batch() risk_summary = { "output_type": "registration_risk_report", "summary": "存在高风险项,正式版导出应被阻断,但允许生成草稿。", "highest_risk_level": "high", "pass_status": "blocked", "manual_review_items": ["CH1.11.5 沟通记录待补齐"], "risk_items": [ {"title": "产品名称跨文档不一致", "risk_level": "high"}, ], } report = generate_registration_export( batch=batch, conversation=conversation, upstream_summary=risk_summary, ) export_path = settings.MEDIA_ROOT / report["output_file"]["relative_path"] assert report["output_type"] == "registration_word_export_report" assert report["can_export_formally"] is False assert report["export_status"] == "draft_only" assert report["filled_fields"][0]["field_name"] == "产品名称" assert report["filled_fields"][0]["fill_status"] == "filled" assert report["blocked_fields"][0]["block_reason"] == "待人工复核" assert report["output_file"]["output_version"] == "draft" assert report["output_file"]["generated_at"] assert report["download_url"].startswith("/media/exports/") assert export_path.exists() with zipfile.ZipFile(export_path) as archive: document_xml = archive.read("word/document.xml").decode("utf-8") assert batch.product_name in document_xml assert "产品名称跨文档不一致" in document_xml @override_settings(MEDIA_URL="/media/") def test_chat_export_word_route_persists_real_download_link(client, db, tmp_path, settings): settings.MEDIA_ROOT = tmp_path / "uploads" batch, conversation = _create_conversation_with_batch() conversation.node_results = [ {"label": "资料包导入", "status": "已完成"}, {"label": "目录汇总", "status": "已完成"}, {"label": "法规完整性检查", "status": "已完成"}, {"label": "字段抽取", "status": "已完成"}, {"label": "一致性核查", "status": "已完成"}, {"label": "风险预警", "status": "已阻断"}, {"label": "Word 回填导出", "status": "待处理"}, {"label": "飞书通知", "status": "待处理"}, ] conversation.latest_summary = { "structured_output": { "output_type": "registration_risk_report", "summary": "存在高风险项,允许草稿导出。", "highest_risk_level": "high", "pass_status": "blocked", "manual_review_items": ["CH1.11.5 沟通记录待补齐"], "risk_items": [{"title": "产品名称跨文档不一致", "risk_level": "high"}], } } conversation.save(update_fields=["node_results", "latest_summary", "updated_at"]) response = client.post( reverse("chat:export-word", args=[conversation.conversation_id]), follow=True, ) content = response.content.decode("utf-8") conversation.refresh_from_db() export_report = conversation.latest_summary["structured_output"] export_path = settings.MEDIA_ROOT / export_report["output_file"]["relative_path"] assert response.status_code == 200 assert export_report["output_type"] == "registration_word_export_report" assert export_report["download_url"].startswith("/media/exports/") assert export_path.exists() assert "下载导出文件" in content assert export_report["download_url"] in content assert AgentAuditLog.objects.filter(conversation_id=conversation.conversation_id).count() == 1 assert ExportedDocument.objects.filter(batch=batch, conversation_id=conversation.conversation_id).count() == 1