Files
DEMO-AGENT/tests/test_chat.py

610 lines
24 KiB
Python

import zipfile
from django.test import override_settings
from django.urls import reverse
from django.core.files.uploadedfile import SimpleUploadedFile
from agent_core.results import AgentResult
from apps.audit.models import AgentAuditLog
from apps.audit.models import NotificationRecord
from apps.chat.models import Conversation
from apps.chat.services import create_conversation_for_batch
from apps.documents.models import ExportedDocument, SubmissionBatch, UploadedDocument
def _create_conversation_with_batch():
batch = SubmissionBatch.objects.create(
batch_id="SUB-20260604-001",
product_name="新型冠状病毒 2019-nCoV 核酸检测试剂盒",
workflow_type="registration",
conversation_id="conv-001",
file_count=2,
page_count=12,
import_status="completed",
)
conversation = Conversation.objects.create(
conversation_id="conv-001",
title="新型冠状病毒 2019-nCoV 核酸检测试剂盒",
product_name=batch.product_name,
batch_id=batch.batch_id,
task_status="processing",
node_results=[
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "处理中"},
],
)
return batch, conversation
def test_chat_post_returns_agent_result_and_audit_log(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(answer="模拟回答", status="success"),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "如何处理异常?"},
)
assert response.status_code == 200
content = response.content.decode("utf-8")
assert "审核智能体" in content
assert "模拟回答" in content
assert AgentAuditLog.objects.count() == 1
assert AgentAuditLog.objects.get().batch_id == batch.batch_id
def test_chat_rejects_empty_message(client, db):
_batch, conversation = _create_conversation_with_batch()
response = client.post(reverse("chat:detail", args=[conversation.conversation_id]), {"message": ""})
assert response.status_code == 200
assert AgentAuditLog.objects.count() == 0
assert "请输入要咨询的问题" in response.content.decode("utf-8")
def test_chat_passes_selected_document_ids_to_agent_core(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
selected = UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="selected.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="other.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
captured = {}
def fake_run_agent(scenario_config, user_input, options=None):
captured["options"] = options or {}
return AgentResult(answer="ok", status="success")
monkeypatch.setattr("apps.chat.views.run_agent", fake_run_agent)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "只查选中文档", "document_ids": [str(selected.id)]},
)
assert response.status_code == 200
assert captured["options"]["document_ids"] == [selected.id]
assert captured["options"]["conversation_id"] == conversation.conversation_id
assert captured["options"]["batch_id"] == batch.batch_id
def test_chat_renders_three_column_workspace_and_node_results(client, db):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "会话历史" in content
assert "对话区与节点导航" in content
assert "上传区" in content
assert "资料包导入 / 已完成" in content
assert "目录汇总 / 处理中" in content
def test_chat_execution_creates_notification_record_from_agent_result(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="执行完成",
status="success",
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_completed",
"owners": [
{
"owner_role": "注册资料负责人",
"feishu_user_id": "ou_demo_1",
}
],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行审核"},
)
assert response.status_code == 200
record = NotificationRecord.objects.get()
assert record.notify_reason == "task_completed"
assert record.batch_id == batch.batch_id
assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/")
def test_chat_execution_uses_notification_payload_message_status_and_receipt(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="通知已发送",
status="success",
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_completed",
"message_status": "sent",
"web_detail_url": "https://example.com/audit/custom",
"receipt": {"message_id": "msg-custom", "status": "sent"},
"owners": [
{
"owner_role": "注册资料负责人",
"feishu_user_id": "ou_demo_1",
}
],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行通知任务"},
)
assert response.status_code == 200
record = NotificationRecord.objects.get()
assert record.message_status == "sent"
assert record.receipt["message_id"] == "msg-custom"
def test_chat_execution_creates_failed_notification_record_and_updates_conversation(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="执行失败",
status="failed",
error="规则执行失败",
node_results=[
{"code": "package_import", "label": "资料包导入", "status": "已完成"},
{"code": "overview", "label": "目录汇总", "status": "已完成"},
{"code": "risk", "label": "风险预警", "status": "已阻断"},
{"code": "feishu_notify", "label": "飞书通知", "status": "失败"},
],
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_failed",
"owners": [
{
"owner_role": "注册申报负责人",
"feishu_user_id": "ou_demo_2",
}
],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行失败任务"},
)
assert response.status_code == 200
record = NotificationRecord.objects.get()
conversation.refresh_from_db()
assert record.notify_reason == "task_failed"
assert record.message_status == "failed"
assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/")
assert conversation.task_status == "failed"
assert conversation.node_results[-1]["label"] == "飞书通知"
def test_chat_execution_persists_agent_node_results_to_conversation(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="已生成风险结论",
status="success",
node_results=[
{"code": "package_import", "label": "资料包导入", "status": "已完成"},
{"code": "overview", "label": "目录汇总", "status": "已完成"},
{"code": "completeness", "label": "法规完整性检查", "status": "已完成"},
{"code": "field_extraction", "label": "字段抽取", "status": "已完成"},
{"code": "consistency", "label": "一致性核查", "status": "待复核"},
{"code": "risk", "label": "风险预警", "status": "已阻断", "summary": "存在高风险"},
{"code": "word_export", "label": "Word 回填导出", "status": "待处理"},
{"code": "feishu_notify", "label": "飞书通知", "status": "待处理"},
],
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_completed",
"owners": [],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行节点任务"},
)
assert response.status_code == 200
conversation.refresh_from_db()
assert len(conversation.node_results) == 8
assert conversation.task_status == "success"
assert conversation.latest_summary["answer"] == "已生成风险结论"
def test_create_conversation_for_batch_initializes_eight_workflow_nodes(db):
conversation = create_conversation_for_batch(
"SUB-20260604-001",
"新型冠状病毒 2019-nCoV 核酸检测试剂盒",
)
labels = [node["label"] for node in conversation.node_results]
assert len(labels) == 8
assert labels == [
"资料包导入",
"目录汇总",
"法规完整性检查",
"字段抽取",
"一致性核查",
"风险预警",
"Word 回填导出",
"飞书通知",
]
def test_chat_page_shows_upload_entry_and_dynamic_context_cards(client, db):
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "待复核"},
{"label": "风险预警", "status": "已阻断"},
{"label": "Word 回填导出", "status": "待处理"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.save(update_fields=["node_results"])
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "继续上传资料" in content
assert "最高风险等级" in content
assert "是否允许正式导出" in content
assert "通知状态" in content
assert "飞书通知 / 待处理" in content
def test_chat_page_blocks_formal_export_when_word_export_node_is_blocked(client, db):
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "已完成"},
{"label": "风险预警", "status": "已完成"},
{"label": "Word 回填导出", "status": "已阻断"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.latest_summary = {
"structured_output": {
"download_url": "/downloads/export.docx",
}
}
conversation.save(update_fields=["node_results", "latest_summary", "updated_at"])
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "是否允许正式导出" in content
assert ">否<" in content
assert "/downloads/export.docx" in content
def test_chat_page_uses_structured_formal_export_flag_when_node_status_is_completed(client, db):
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "已完成"},
{"label": "风险预警", "status": "已完成"},
{"label": "Word 回填导出", "status": "已完成"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.latest_summary = {
"structured_output": {
"can_export_formally": False,
"download_url": "/downloads/review-only.docx",
}
}
conversation.save(update_fields=["node_results", "latest_summary", "updated_at"])
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "是否允许正式导出" in content
assert ">否<" in content
assert "/downloads/review-only.docx" in content
def test_chat_page_shows_word_export_field_table_and_governance_entries(client, db):
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "已完成"},
{"label": "风险预警", "status": "已阻断"},
{"label": "Word 回填导出", "status": "待复核"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.latest_summary = {
"structured_output": {
"output_type": "registration_word_export_report",
"template_name": "注册证导出模板",
"template_version": "V1.0",
"export_status": "draft_only",
"filled_fields": [
{
"placeholder": "{{ product_name }}",
"field_name": "产品名称",
"field_value": "新型冠状病毒 2019-nCoV 核酸检测试剂盒",
"source": "资料包主信息",
"fill_status": "filled",
"required": True,
}
],
"blocked_fields": [
{
"field_name": "产品名称跨文档不一致",
"block_reason": "待人工复核",
"risk_source": "registration_risk_report",
}
],
"download_url": "/media/exports/20260604/SUB-20260604-001-draft.docx",
}
}
conversation.save(update_fields=["node_results", "latest_summary", "updated_at"])
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "Word 导出能力卡" in content
assert "回填字段表" in content
assert "产品名称" in content
assert "资料包主信息" in content
assert "拦截项区" in content
assert "产品名称跨文档不一致" in content
assert "维护 Word 模板" in content
assert "维护字段映射" in content
def test_chat_upload_keeps_existing_conversation_binding_and_adds_documents(client, db):
batch, conversation = _create_conversation_with_batch()
existing_document = UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="原说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
existing_count = UploadedDocument.objects.filter(batch=batch).count()
upload_file = SimpleUploadedFile(
"新增补充资料.txt",
"产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"),
content_type="text/plain",
)
response = client.post(
reverse("chat:upload-documents", args=[conversation.conversation_id]),
{"files": [upload_file]},
follow=True,
)
content = response.content.decode("utf-8")
batch.refresh_from_db()
conversation.refresh_from_db()
assert response.status_code == 200
assert SubmissionBatch.objects.count() == 1
assert Conversation.objects.count() == 1
assert conversation.conversation_id == "conv-001"
assert batch.conversation_id == conversation.conversation_id
assert UploadedDocument.objects.filter(batch=batch).count() == existing_count + 1
assert UploadedDocument.objects.filter(batch=batch, original_name="新增补充资料.txt").exists()
assert "新增补充资料.txt" in content
assert "已补充到当前资料包" in content
@override_settings(MEDIA_URL="/media/")
def test_generate_registration_export_creates_real_docx_file(db, tmp_path, settings):
from apps.chat.export_service import generate_registration_export
settings.MEDIA_ROOT = tmp_path / "uploads"
batch, conversation = _create_conversation_with_batch()
risk_summary = {
"output_type": "registration_risk_report",
"summary": "存在高风险项,正式版导出应被阻断,但允许生成草稿。",
"highest_risk_level": "high",
"pass_status": "blocked",
"manual_review_items": ["CH1.11.5 沟通记录待补齐"],
"risk_items": [
{"title": "产品名称跨文档不一致", "risk_level": "high"},
],
}
report = generate_registration_export(
batch=batch,
conversation=conversation,
upstream_summary=risk_summary,
)
export_path = settings.MEDIA_ROOT / report["output_file"]["relative_path"]
assert report["output_type"] == "registration_word_export_report"
assert report["can_export_formally"] is False
assert report["export_status"] == "draft_only"
assert report["filled_fields"][0]["field_name"] == "产品名称"
assert report["filled_fields"][0]["fill_status"] == "filled"
assert report["blocked_fields"][0]["block_reason"] == "待人工复核"
assert report["output_file"]["output_version"] == "draft"
assert report["output_file"]["generated_at"]
assert report["download_url"].startswith("/media/exports/")
assert export_path.exists()
with zipfile.ZipFile(export_path) as archive:
document_xml = archive.read("word/document.xml").decode("utf-8")
assert batch.product_name in document_xml
assert "产品名称跨文档不一致" in document_xml
@override_settings(MEDIA_URL="/media/")
def test_chat_export_word_route_persists_real_download_link(client, db, tmp_path, settings):
settings.MEDIA_ROOT = tmp_path / "uploads"
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "已完成"},
{"label": "风险预警", "status": "已阻断"},
{"label": "Word 回填导出", "status": "待处理"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.latest_summary = {
"structured_output": {
"output_type": "registration_risk_report",
"summary": "存在高风险项,允许草稿导出。",
"highest_risk_level": "high",
"pass_status": "blocked",
"manual_review_items": ["CH1.11.5 沟通记录待补齐"],
"risk_items": [{"title": "产品名称跨文档不一致", "risk_level": "high"}],
}
}
conversation.save(update_fields=["node_results", "latest_summary", "updated_at"])
response = client.post(
reverse("chat:export-word", args=[conversation.conversation_id]),
follow=True,
)
content = response.content.decode("utf-8")
conversation.refresh_from_db()
export_report = conversation.latest_summary["structured_output"]
export_path = settings.MEDIA_ROOT / export_report["output_file"]["relative_path"]
assert response.status_code == 200
assert export_report["output_type"] == "registration_word_export_report"
assert export_report["download_url"].startswith("/media/exports/")
assert export_path.exists()
assert "下载导出文件" in content
assert export_report["download_url"] in content
assert AgentAuditLog.objects.filter(conversation_id=conversation.conversation_id).count() == 1
assert ExportedDocument.objects.filter(batch=batch, conversation_id=conversation.conversation_id).count() == 1