feat: 重构处理历史与通知留痕追踪
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from agent_core.orchestrator import build_messages, run_agent
|
||||
from agent_core.rag.ingest import _split_text, ingest_document
|
||||
from agent_core.rag.retriever import retrieve
|
||||
from agent_core.schemas.outputs import SUPPORTED_OUTPUT_TYPES
|
||||
|
||||
|
||||
def test_run_agent_returns_structured_result_from_llm_output():
|
||||
@@ -248,3 +249,59 @@ def test_retrieve_returns_empty_when_query_has_no_overlap(tmp_path):
|
||||
)
|
||||
|
||||
assert chunks == []
|
||||
|
||||
|
||||
def test_registration_risk_result_includes_owner_fields_and_notification_payload():
|
||||
scenario = {
|
||||
"id": "document_review",
|
||||
"name": "注册审核智能体",
|
||||
"agent": {
|
||||
"role": "注册审核助手",
|
||||
"goal": "输出风险结果",
|
||||
"instructions": ["输出结构化风险结果"],
|
||||
},
|
||||
"rag": {"enabled": False},
|
||||
"tools": [],
|
||||
"output": {"type": "registration_risk_report"},
|
||||
}
|
||||
provider_response = """
|
||||
{
|
||||
"summary": "存在高风险项,需人工复核。",
|
||||
"highest_risk_level": "high",
|
||||
"pass_status": "blocked",
|
||||
"owner_roles": [
|
||||
{
|
||||
"owner_role": "注册资料负责人",
|
||||
"owner_name": "张三",
|
||||
"department": "注册事务部",
|
||||
"chapter_scope": "CH1",
|
||||
"risk_scope": "字段冲突",
|
||||
"feishu_user_id": "ou_demo_1",
|
||||
"feishu_open_id": "on_demo_1",
|
||||
"feishu_name": "张三",
|
||||
"notify_enabled": true
|
||||
}
|
||||
],
|
||||
"notify_reason": "task_completed"
|
||||
}
|
||||
"""
|
||||
|
||||
class FakeProvider:
|
||||
def generate(self, messages, response_format=None):
|
||||
from agent_core.llm_provider import LLMResponse
|
||||
|
||||
return LLMResponse(content=provider_response, model_name="demo-model", success=True)
|
||||
|
||||
result = run_agent(scenario, "请输出风险结果", options={"llm_provider": FakeProvider()})
|
||||
|
||||
owner = result.notification_payload["owners"][0]
|
||||
assert result.structured_output["output_type"] == "registration_risk_report"
|
||||
assert owner["owner_role"] == "注册资料负责人"
|
||||
assert owner["feishu_user_id"] == "ou_demo_1"
|
||||
assert owner["feishu_open_id"] == "on_demo_1"
|
||||
assert result.notification_payload["notify_reason"] == "task_completed"
|
||||
|
||||
|
||||
def test_supported_output_types_include_word_export_and_feishu_notification():
|
||||
assert "registration_word_export_report" in SUPPORTED_OUTPUT_TYPES
|
||||
assert "feishu_notification_report" in SUPPORTED_OUTPUT_TYPES
|
||||
|
||||
Reference in New Issue
Block a user