Files
DEMO-AGENT/tests/test_chat.py

328 lines
12 KiB
Python

from django.urls import reverse
from agent_core.results import AgentResult
from apps.audit.models import AgentAuditLog
from apps.audit.models import NotificationRecord
from apps.chat.models import Conversation
from apps.chat.services import create_conversation_for_batch
from apps.documents.models import SubmissionBatch, UploadedDocument
def _create_conversation_with_batch():
batch = SubmissionBatch.objects.create(
batch_id="SUB-20260604-001",
product_name="新型冠状病毒 2019-nCoV 核酸检测试剂盒",
workflow_type="registration",
conversation_id="conv-001",
file_count=2,
page_count=12,
import_status="completed",
)
conversation = Conversation.objects.create(
conversation_id="conv-001",
title="新型冠状病毒 2019-nCoV 核酸检测试剂盒",
product_name=batch.product_name,
batch_id=batch.batch_id,
task_status="processing",
node_results=[
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "处理中"},
],
)
return batch, conversation
def test_chat_post_returns_agent_result_and_audit_log(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(answer="模拟回答", status="success"),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "如何处理异常?"},
)
assert response.status_code == 200
content = response.content.decode("utf-8")
assert "审核智能体" in content
assert "模拟回答" in content
assert AgentAuditLog.objects.count() == 1
assert AgentAuditLog.objects.get().batch_id == batch.batch_id
def test_chat_rejects_empty_message(client, db):
_batch, conversation = _create_conversation_with_batch()
response = client.post(reverse("chat:detail", args=[conversation.conversation_id]), {"message": ""})
assert response.status_code == 200
assert AgentAuditLog.objects.count() == 0
assert "请输入要咨询的问题" in response.content.decode("utf-8")
def test_chat_passes_selected_document_ids_to_agent_core(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
selected = UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="selected.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="other.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
captured = {}
def fake_run_agent(scenario_config, user_input, options=None):
captured["options"] = options or {}
return AgentResult(answer="ok", status="success")
monkeypatch.setattr("apps.chat.views.run_agent", fake_run_agent)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "只查选中文档", "document_ids": [str(selected.id)]},
)
assert response.status_code == 200
assert captured["options"]["document_ids"] == [selected.id]
assert captured["options"]["conversation_id"] == conversation.conversation_id
assert captured["options"]["batch_id"] == batch.batch_id
def test_chat_renders_three_column_workspace_and_node_results(client, db):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "会话历史" in content
assert "对话区与节点导航" in content
assert "上传区" in content
assert "资料包导入 / 已完成" in content
assert "目录汇总 / 处理中" in content
def test_chat_execution_creates_notification_record_from_agent_result(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="执行完成",
status="success",
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_completed",
"owners": [
{
"owner_role": "注册资料负责人",
"feishu_user_id": "ou_demo_1",
}
],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行审核"},
)
assert response.status_code == 200
record = NotificationRecord.objects.get()
assert record.notify_reason == "task_completed"
assert record.batch_id == batch.batch_id
assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/")
def test_chat_execution_creates_failed_notification_record_and_updates_conversation(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="执行失败",
status="failed",
error="规则执行失败",
node_results=[
{"code": "package_import", "label": "资料包导入", "status": "已完成"},
{"code": "overview", "label": "目录汇总", "status": "已完成"},
{"code": "risk", "label": "风险预警", "status": "已阻断"},
{"code": "feishu_notify", "label": "飞书通知", "status": "失败"},
],
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_failed",
"owners": [
{
"owner_role": "注册申报负责人",
"feishu_user_id": "ou_demo_2",
}
],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行失败任务"},
)
assert response.status_code == 200
record = NotificationRecord.objects.get()
conversation.refresh_from_db()
assert record.notify_reason == "task_failed"
assert record.message_status == "failed"
assert record.web_detail_url.endswith(f"/audit/{AgentAuditLog.objects.get().id}/")
assert conversation.task_status == "failed"
assert conversation.node_results[-1]["label"] == "飞书通知"
def test_chat_execution_persists_agent_node_results_to_conversation(client, db, monkeypatch):
batch, conversation = _create_conversation_with_batch()
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
monkeypatch.setattr(
"apps.chat.views.run_agent",
lambda *args, **kwargs: AgentResult(
answer="已生成风险结论",
status="success",
node_results=[
{"code": "package_import", "label": "资料包导入", "status": "已完成"},
{"code": "overview", "label": "目录汇总", "status": "已完成"},
{"code": "completeness", "label": "法规完整性检查", "status": "已完成"},
{"code": "field_extraction", "label": "字段抽取", "status": "已完成"},
{"code": "consistency", "label": "一致性核查", "status": "待复核"},
{"code": "risk", "label": "风险预警", "status": "已阻断", "summary": "存在高风险"},
{"code": "word_export", "label": "Word 回填导出", "status": "待处理"},
{"code": "feishu_notify", "label": "飞书通知", "status": "待处理"},
],
notification_payload={
"batch_id": batch.batch_id,
"conversation_id": conversation.conversation_id,
"product_name": batch.product_name,
"notify_reason": "task_completed",
"owners": [],
},
),
)
response = client.post(
reverse("chat:detail", args=[conversation.conversation_id]),
{"message": "执行节点任务"},
)
assert response.status_code == 200
conversation.refresh_from_db()
assert len(conversation.node_results) == 8
assert conversation.task_status == "success"
assert conversation.latest_summary["answer"] == "已生成风险结论"
def test_create_conversation_for_batch_initializes_eight_workflow_nodes(db):
conversation = create_conversation_for_batch(
"SUB-20260604-001",
"新型冠状病毒 2019-nCoV 核酸检测试剂盒",
)
labels = [node["label"] for node in conversation.node_results]
assert len(labels) == 8
assert labels == [
"资料包导入",
"目录汇总",
"法规完整性检查",
"字段抽取",
"一致性核查",
"风险预警",
"Word 回填导出",
"飞书通知",
]
def test_chat_page_shows_upload_entry_and_dynamic_context_cards(client, db):
batch, conversation = _create_conversation_with_batch()
conversation.node_results = [
{"label": "资料包导入", "status": "已完成"},
{"label": "目录汇总", "status": "已完成"},
{"label": "法规完整性检查", "status": "已完成"},
{"label": "字段抽取", "status": "已完成"},
{"label": "一致性核查", "status": "待复核"},
{"label": "风险预警", "status": "已阻断"},
{"label": "Word 回填导出", "status": "待处理"},
{"label": "飞书通知", "status": "待处理"},
]
conversation.save(update_fields=["node_results"])
UploadedDocument.objects.create(
batch=batch,
scenario_id="document_review",
original_name="说明书.md",
file_type="md",
size=1,
status=UploadedDocument.STATUS_INDEXED,
)
response = client.get(reverse("chat:detail", args=[conversation.conversation_id]))
content = response.content.decode("utf-8")
assert response.status_code == 200
assert "继续上传资料" in content
assert "最高风险等级" in content
assert "是否允许正式导出" in content
assert "通知状态" in content
assert "飞书通知 / 待处理" in content