Files
DEMO-AGENT/tests/test_agent_core.py

594 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from agent_core.orchestrator import build_messages, run_agent
from agent_core.rag.ingest import _split_text, ingest_document
from agent_core.rag.retriever import retrieve
from agent_core.schemas.outputs import SUPPORTED_OUTPUT_TYPES
from django.test import override_settings
def test_run_agent_returns_structured_result_from_llm_output():
scenario = {
"id": "knowledge_qa",
"name": "知识库问答助手",
"agent": {
"role": "知识库助手",
"goal": "基于资料回答问题",
"instructions": ["仅根据证据回答"],
},
"rag": {"enabled": True, "collection": "knowledge_qa", "top_k": 3},
"tools": ["generate_action_items"],
"output": {"type": "general_answer"},
}
provider_response = """
{
"answer": "请先隔离异常现场,再通知负责人。",
"confidence": "high",
"references": [
{"source": "sop.md", "excerpt": "异常处理 SOP先隔离现场"}
]
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(
content=provider_response,
model_name="demo-model",
success=True,
)
result = run_agent(
scenario,
"如何处理异常?",
options={"llm_provider": FakeProvider()},
)
assert result.status == "success"
assert result.answer == "请先隔离异常现场,再通知负责人。"
assert result.structured_output["output_type"] == "general_answer"
assert result.structured_output["confidence"] == "high"
assert isinstance(result.references, list)
assert result.tool_calls[0]["tool_name"] == "generate_action_items"
assert result.model_name == "demo-model"
def test_run_agent_falls_back_when_llm_returns_non_json():
scenario = {
"id": "document_review",
"name": "文档审核助手",
"agent": {
"role": "审核助手",
"goal": "总结审核意见",
"instructions": ["输出重点问题"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "document_review_report"},
}
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(
content="这是非 JSON 的普通回答",
model_name="demo-model",
success=True,
)
result = run_agent(
scenario,
"请检查合同风险",
options={"llm_provider": FakeProvider()},
)
assert result.status == "success"
assert result.answer == "这是非 JSON 的普通回答"
assert result.structured_output["output_type"] == "document_review_report"
assert result.structured_output["summary"] == "这是非 JSON 的普通回答"
assert result.structured_output["parse_mode"] == "fallback"
def test_build_messages_contains_role_goal_references_and_tool_results():
scenario = {
"name": "质量异常分析助手",
"agent": {
"role": "质量管理专家",
"goal": "生成结构化质量分析报告",
"instructions": ["必须引用知识库", "缺失信息要说明"],
},
"output": {"type": "quality_report"},
}
messages = build_messages(
scenario_config=scenario,
user_input="分析 A 线异常",
references=[{"source": "sop.md", "content": "先隔离现场"}],
tool_calls=[
{
"tool_name": "query_demo_records",
"success": True,
"result": {"records": [{"title": "A线缺陷"}]},
"error": "",
}
],
)
assert messages[0]["role"] == "system"
assert "质量管理专家" in messages[0]["content"]
assert "生成结构化质量分析报告" in messages[0]["content"]
assert "quality_report" in messages[0]["content"]
assert "先隔离现场" in messages[1]["content"]
assert "A线缺陷" in messages[1]["content"]
assert "分析 A 线异常" in messages[2]["content"]
def test_rag_ingest_and_retrieve_filters_by_scenario_and_query(tmp_path):
store_path = tmp_path / "rag_store.json"
text = "设备点检需要先断电挂牌。质量异常需要记录批次、工位和缺陷现象。"
result = ingest_document(
scenario_id="quality_analysis",
source_file="quality.md",
text=text,
collection="quality_analysis",
store_path=store_path,
)
ingest_document(
scenario_id="risk_audit",
source_file="risk.md",
text="报销审核需要检查发票、金额和审批链。",
collection="risk_audit",
store_path=store_path,
)
chunks = retrieve(
scenario_id="quality_analysis",
query="质量异常批次",
collection="quality_analysis",
top_k=3,
store_path=store_path,
)
assert result.success is True
assert result.chunks_count >= 1
assert chunks
assert chunks[0]["source"] == "quality.md"
assert "质量异常" in chunks[0]["content"]
assert all(chunk["scenario_id"] == "quality_analysis" for chunk in chunks)
def test_rag_reingest_replaces_same_document_and_retrieve_filters_document_ids(tmp_path):
store_path = tmp_path / "rag_store.json"
ingest_document(
document_id=1,
scenario_id="knowledge_qa",
source_file="old.md",
text="旧制度要求人工登记。",
collection="knowledge_qa",
store_path=store_path,
)
ingest_document(
document_id=1,
scenario_id="knowledge_qa",
source_file="new.md",
text="新制度要求系统自动登记。",
collection="knowledge_qa",
store_path=store_path,
)
ingest_document(
document_id=2,
scenario_id="knowledge_qa",
source_file="other.md",
text="系统自动登记后需要生成审计记录。",
collection="knowledge_qa",
store_path=store_path,
)
chunks = retrieve(
scenario_id="knowledge_qa",
query="系统自动登记",
collection="knowledge_qa",
top_k=5,
document_ids=[1],
store_path=store_path,
)
assert chunks
assert {chunk["document_id"] for chunk in chunks} == {1}
assert all(chunk["source"] == "new.md" for chunk in chunks)
assert all("旧制度" not in chunk["content"] for chunk in chunks)
def test_run_agent_uses_retrieved_document_chunks(tmp_path):
store_path = tmp_path / "rag_store.json"
ingest_document(
scenario_id="knowledge_qa",
source_file="sop.md",
text="异常处理 SOP先隔离现场再通知负责人。",
collection="knowledge_qa",
store_path=store_path,
)
scenario = {
"id": "knowledge_qa",
"name": "知识库问答助手",
"rag": {"enabled": True, "collection": "knowledge_qa", "top_k": 3},
"tools": [],
"output": {"type": "general_answer"},
}
result = run_agent(scenario, "异常处理怎么做?", options={"rag_store_path": store_path})
assert result.references[0]["source"] == "sop.md"
assert "隔离现场" in result.references[0]["content"]
def test_rag_split_text_keeps_overlap_and_non_empty_chunks():
chunks = _split_text("A" * 20, chunk_size=8, overlap=3)
assert chunks == ["AAAAAAAA", "AAAAAAAA", "AAAAAAAA", "AAAAA"]
def test_retrieve_returns_empty_when_query_has_no_overlap(tmp_path):
store_path = tmp_path / "rag_store.json"
ingest_document(
scenario_id="knowledge_qa",
source_file="rules.md",
text="这里描述的是报销流程和审批链。",
collection="knowledge_qa",
store_path=store_path,
)
chunks = retrieve(
scenario_id="knowledge_qa",
query="设备点检",
collection="knowledge_qa",
top_k=3,
store_path=store_path,
)
assert chunks == []
def test_registration_risk_result_includes_owner_fields_and_notification_payload():
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出风险结果",
"instructions": ["输出结构化风险结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "registration_risk_report"},
}
provider_response = """
{
"summary": "存在高风险项,需人工复核。",
"highest_risk_level": "high",
"pass_status": "blocked",
"owner_roles": [
{
"owner_role": "注册资料负责人",
"owner_name": "张三",
"department": "注册事务部",
"chapter_scope": "CH1",
"risk_scope": "字段冲突",
"feishu_user_id": "ou_demo_1",
"feishu_open_id": "on_demo_1",
"feishu_name": "张三",
"notify_enabled": true
}
],
"notify_reason": "task_completed"
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
result = run_agent(scenario, "请输出风险结果", options={"llm_provider": FakeProvider()})
owner = result.notification_payload["owners"][0]
assert result.structured_output["output_type"] == "registration_risk_report"
assert owner["owner_role"] == "注册资料负责人"
assert owner["feishu_user_id"] == "ou_demo_1"
assert owner["feishu_open_id"] == "on_demo_1"
assert result.notification_payload["notify_reason"] == "task_completed"
def test_supported_output_types_include_word_export_and_feishu_notification():
assert "registration_word_export_report" in SUPPORTED_OUTPUT_TYPES
assert "feishu_notification_report" in SUPPORTED_OUTPUT_TYPES
def test_registration_risk_report_builds_eight_business_nodes():
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出风险结果",
"instructions": ["输出结构化风险结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "registration_risk_report"},
}
provider_response = """
{
"summary": "存在高风险项,需人工复核。",
"highest_risk_level": "high",
"pass_status": "blocked",
"owner_roles": [],
"notify_reason": "task_completed"
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
result = run_agent(scenario, "请输出风险结果", options={"llm_provider": FakeProvider()})
assert len(result.node_results) == 8
assert [node["label"] for node in result.node_results] == [
"资料包导入",
"目录汇总",
"法规完整性检查",
"字段抽取",
"一致性核查",
"风险预警",
"Word 回填导出",
"飞书通知",
]
assert result.node_results[5]["status"] == "已阻断"
assert result.node_results[6]["status"] == "待处理"
assert result.node_results[7]["status"] == "待处理"
@override_settings(GOVERNANCE_CONFIG_PATH="")
def test_registration_risk_payload_falls_back_to_governance_owner_mappings(tmp_path):
config_path = tmp_path / "governance.yaml"
config_path.write_text(
"""
owner_mappings:
- owner_role: 法规负责人
owner_name: 赵六
department: 法规事务部
chapter_scope: CH1
risk_scope: 高风险缺失
feishu_user_id: ou_governance_1
feishu_open_id: on_governance_1
feishu_name: 赵六
notify_enabled: 是
""".strip(),
encoding="utf-8",
)
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出风险结果",
"instructions": ["输出结构化风险结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "registration_risk_report"},
}
provider_response = """
{
"summary": "存在高风险项,需人工复核。",
"highest_risk_level": "high",
"pass_status": "blocked",
"owner_roles": [],
"notify_reason": "task_completed"
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
with override_settings(GOVERNANCE_CONFIG_PATH=config_path):
result = run_agent(scenario, "请输出风险结果", options={"llm_provider": FakeProvider()})
owner = result.notification_payload["owners"][0]
assert owner["owner_role"] == "法规负责人"
assert owner["feishu_user_id"] == "ou_governance_1"
assert result.notification_payload["notify_reason"] == "task_completed"
@override_settings(GOVERNANCE_CONFIG_PATH="")
def test_failed_agent_result_uses_governance_owner_mappings_for_failed_notification(tmp_path):
config_path = tmp_path / "governance.yaml"
config_path.write_text(
"""
owner_mappings:
- owner_role: 注册资料负责人
owner_name: 孙七
department: 注册事务部
chapter_scope: CH2
risk_scope: 执行异常
feishu_user_id: ou_failed_1
feishu_open_id: on_failed_1
feishu_name: 孙七
notify_enabled: 是
""".strip(),
encoding="utf-8",
)
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出风险结果",
"instructions": ["输出结构化风险结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "registration_risk_report"},
}
class FailedProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(
content="",
model_name="demo-model",
success=False,
error="provider down",
)
with override_settings(GOVERNANCE_CONFIG_PATH=config_path):
result = run_agent(scenario, "请输出风险结果", options={"llm_provider": FailedProvider()})
owner = result.notification_payload["owners"][0]
assert result.status == "failed"
assert result.notification_payload["notify_reason"] == "task_failed"
assert owner["owner_name"] == "孙七"
assert owner["feishu_open_id"] == "on_failed_1"
def test_feishu_notification_report_builds_notification_payload_with_receipt_and_node_status():
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出通知结果",
"instructions": ["输出结构化通知结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "feishu_notification_report"},
}
provider_response = """
{
"batch_id": "SUB-20260604-003",
"conversation_id": "conv-003",
"notify_reason": "task_completed",
"mentioned_users": ["ou_demo_1"],
"message_status": "sent",
"web_detail_url": "https://example.com/audit/3",
"receipt": {
"message_id": "msg-3",
"status": "sent"
}
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
result = run_agent(
scenario,
"请生成通知结果",
options={
"llm_provider": FakeProvider(),
"batch_id": "SUB-20260604-003",
"conversation_id": "conv-003",
"product_name": "产品C",
},
)
assert result.node_results[7]["status"] == "已发送"
assert result.notification_payload["message_status"] == "sent"
assert result.notification_payload["web_detail_url"] == "https://example.com/audit/3"
assert result.notification_payload["receipt"]["message_id"] == "msg-3"
def test_notification_payload_normalizes_unsupported_notify_reason():
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出通知结果",
"instructions": ["输出结构化通知结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "feishu_notification_report"},
}
provider_response = """
{
"batch_id": "SUB-20260604-004",
"conversation_id": "conv-004",
"notify_reason": "custom_reason",
"mentioned_users": ["ou_demo_1"],
"message_status": "sent"
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
result = run_agent(
scenario,
"请生成通知结果",
options={
"llm_provider": FakeProvider(),
"batch_id": "SUB-20260604-004",
"conversation_id": "conv-004",
"product_name": "产品D",
},
)
assert result.notification_payload["notify_reason"] == "task_completed"
def test_registration_word_export_report_preserves_formal_export_flag_and_blocked_items():
scenario = {
"id": "document_review",
"name": "注册审核智能体",
"agent": {
"role": "注册审核助手",
"goal": "输出导出结果",
"instructions": ["输出结构化导出结果"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "registration_word_export_report"},
}
provider_response = """
{
"summary": "已生成草稿导出包。",
"export_status": "draft_only",
"can_export_formally": false,
"blocked_items": ["风险项未清零", "需人工复核后再导出"],
"download_url": "/downloads/registration-draft.docx"
}
"""
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
result = run_agent(scenario, "请输出导出结果", options={"llm_provider": FakeProvider()})
assert result.structured_output["can_export_formally"] is False
assert result.structured_output["download_url"] == "/downloads/registration-draft.docx"
assert result.structured_output["blocked_items"] == ["风险项未清零", "需人工复核后再导出"]
assert result.node_results[6]["status"] == "待复核"