feat(agent-core): 补齐提示词编排与结构化解析

This commit is contained in:
2026-05-30 00:20:40 +08:00
parent ba3f5fc584
commit df45a89eb1
5 changed files with 421 additions and 29 deletions

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
import json import json
import os
from urllib.error import URLError from urllib.error import URLError
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
@@ -25,13 +26,22 @@ class MockLLMProvider:
self.model_name = model_name or "mock-model" self.model_name = model_name or "mock-model"
def generate(self, messages: list[dict], response_format: dict | None = None) -> LLMResponse: def generate(self, messages: list[dict], response_format: dict | None = None) -> LLMResponse:
# Mock Provider 的职责是让页面和测试在未接入真实模型时也能闭环。
# 因此这里直接返回稳定 JSON方便后续统一走结构化解析逻辑。
user_content = "" user_content = ""
for message in reversed(messages): for message in reversed(messages):
if message.get("role") == "user": if message.get("role") == "user":
user_content = message.get("content", "") user_content = message.get("content", "")
break break
return LLMResponse( return LLMResponse(
content=f"模拟模型回答:{user_content}", content=json.dumps(
{
"answer": f"模拟回答:{user_content}",
"confidence": "medium",
"references": [],
},
ensure_ascii=False,
),
model_name=self.model_name, model_name=self.model_name,
success=True, success=True,
) )
@@ -112,7 +122,9 @@ def _post_json(base_url: str, endpoint: str, api_key: str, payload: dict) -> dic
def create_llm_provider(config: dict | None = None): def create_llm_provider(config: dict | None = None):
config = config or {} config = config or {}
provider_name = config.get("LLM_PROVIDER", "mock") provider_name = config.get("LLM_PROVIDER")
if not provider_name:
provider_name = "openai_compatible" if config.get("LLM_API_KEY") else "mock"
model_name = config.get("LLM_MODEL", "mock-model") model_name = config.get("LLM_MODEL", "mock-model")
if provider_name == "mock": if provider_name == "mock":
return MockLLMProvider(model_name=model_name) return MockLLMProvider(model_name=model_name)
@@ -130,3 +142,21 @@ def create_embedding_provider(config: dict | None = None):
base_url=config.get("EMBEDDING_BASE_URL", config.get("LLM_BASE_URL", "https://api.openai.com/v1")), base_url=config.get("EMBEDDING_BASE_URL", config.get("LLM_BASE_URL", "https://api.openai.com/v1")),
model_name=config.get("EMBEDDING_MODEL", "text-embedding-3-small"), model_name=config.get("EMBEDDING_MODEL", "text-embedding-3-small"),
) )
def get_runtime_llm_config(overrides: dict | None = None) -> dict:
"""
从环境变量读取运行时配置。
Agent Core 通过这层读取模型配置,避免直接依赖 Django settings
这样本模块在独立脚本、测试和 Django 中都能复用。
"""
config = {
"LLM_PROVIDER": os.environ.get("LLM_PROVIDER", ""),
"LLM_API_KEY": os.environ.get("LLM_API_KEY", ""),
"LLM_BASE_URL": os.environ.get("LLM_BASE_URL", "https://api.openai.com/v1"),
"LLM_MODEL": os.environ.get("LLM_MODEL", "mock-model"),
}
if overrides:
config.update(overrides)
return config

View File

@@ -1,40 +1,153 @@
import json
import time import time
from .llm_provider import create_llm_provider, get_runtime_llm_config
from .results import AgentResult from .results import AgentResult
from .structured_output import build_mock_structured_output from .structured_output import (
build_response_schema_hint,
extract_answer_from_structured_output,
parse_structured_output,
)
from .tool_registry import run_declared_tools from .tool_registry import run_declared_tools
from .rag.retriever import retrieve from .rag.retriever import retrieve
def run_agent(scenario_config: dict, user_input: str, options: dict | None = None) -> AgentResult: def run_agent(scenario_config: dict, user_input: str, options: dict | None = None) -> AgentResult:
"""
执行当前场景的最小 Agent 闭环。
处理顺序保持和设计文档一致:
1. 读取场景配置
2. 执行 RAG 检索
3. 执行声明式工具
4. 构造 Prompt 并调用 LLM
5. 解析结构化结果
6. 统一返回 AgentResult
"""
started_at = time.perf_counter() started_at = time.perf_counter()
options = options or {} options = options or {}
output_type = scenario_config.get("output", {}).get("type", "general_answer") output_type = scenario_config.get("output", {}).get("type", "general_answer")
references = [] references = _collect_references(scenario_config=scenario_config, user_input=user_input, options=options)
rag_config = scenario_config.get("rag", {})
if rag_config.get("enabled"):
references = retrieve(
scenario_id=scenario_config.get("id", ""),
query=user_input,
collection=rag_config.get("collection", scenario_config.get("id", "")),
top_k=rag_config.get("top_k", 5),
document_ids=options.get("document_ids"),
store_path=options.get("rag_store_path"),
)
tool_calls = run_declared_tools(scenario_config.get("tools", []), user_input) tool_calls = run_declared_tools(scenario_config.get("tools", []), user_input)
structured_output = build_mock_structured_output(output_type, user_input, references) messages = build_messages(
answer = f"已根据「{scenario_config.get('name', '当前场景')}」生成模拟回答:{user_input}" scenario_config=scenario_config,
user_input=user_input,
references=references,
tool_calls=tool_calls,
)
provider = options.get("llm_provider") or create_llm_provider(
get_runtime_llm_config(options.get("llm_config"))
)
llm_response = provider.generate(
messages,
response_format=build_response_schema_hint(output_type),
)
latency_ms = int((time.perf_counter() - started_at) * 1000) latency_ms = int((time.perf_counter() - started_at) * 1000)
if not llm_response.success:
return AgentResult(
answer="模型调用失败,请检查配置或稍后重试。",
structured_output={},
references=references,
tool_calls=tool_calls,
raw_output="",
model_name=llm_response.model_name or "unknown-model",
latency_ms=latency_ms,
status="failed",
error=str(llm_response.error or "未知模型错误"),
)
structured_output, _ = parse_structured_output(llm_response.content, output_type)
answer = extract_answer_from_structured_output(structured_output, llm_response.content)
return AgentResult( return AgentResult(
answer=answer, answer=answer,
structured_output=structured_output, structured_output=structured_output,
references=references, references=references,
tool_calls=tool_calls, tool_calls=tool_calls,
raw_output=answer, raw_output=llm_response.content,
model_name=options.get("model_name", "mock-model"), model_name=llm_response.model_name or "unknown-model",
latency_ms=latency_ms, latency_ms=latency_ms,
status="success", status="success",
) )
def build_messages(
scenario_config: dict,
user_input: str,
references: list[dict],
tool_calls: list[dict],
) -> list[dict]:
"""将场景配置、检索结果和工具结果整合为最小可解释 Prompt。"""
agent_config = scenario_config.get("agent", {})
system_message = "\n".join(
[
f"你当前扮演的角色:{agent_config.get('role', '通用业务助手')}",
f"当前任务目标:{agent_config.get('goal', '根据输入生成结构化结果')}",
"执行要求:",
_format_instructions(agent_config.get("instructions", [])),
f"输出类型:{scenario_config.get('output', {}).get('type', 'general_answer')}",
"请优先输出 JSON 对象,字段必须贴近约定输出结构。",
]
)
context_message = "\n".join(
[
f"当前场景:{scenario_config.get('name', '未命名场景')}",
_format_references(references),
_format_tool_calls(tool_calls),
]
)
return [
{"role": "system", "content": system_message},
{"role": "assistant", "content": context_message},
{"role": "user", "content": user_input},
]
def _collect_references(scenario_config: dict, user_input: str, options: dict) -> list[dict]:
"""按场景配置执行检索,并保持无 RAG 场景也能正常返回空列表。"""
rag_config = scenario_config.get("rag", {})
if not rag_config.get("enabled"):
return []
return retrieve(
scenario_id=scenario_config.get("id", ""),
query=user_input,
collection=rag_config.get("collection", scenario_config.get("id", "")),
top_k=rag_config.get("top_k", 5),
document_ids=options.get("document_ids"),
store_path=options.get("rag_store_path"),
)
def _format_instructions(instructions: list[str]) -> str:
if not instructions:
return "1. 结合知识库和工具结果回答。\n2. 信息不足时明确说明。"
return "\n".join(f"{index}. {item}" for index, item in enumerate(instructions, start=1))
def _format_references(references: list[dict]) -> str:
if not references:
return "知识库引用:当前没有检索到可用片段。"
lines = ["知识库引用:"]
for index, reference in enumerate(references, start=1):
lines.append(
f"{index}. 来源={reference.get('source', '未知来源')} 内容={reference.get('content', '')}"
)
return "\n".join(lines)
def _format_tool_calls(tool_calls: list[dict]) -> str:
if not tool_calls:
return "工具结果:当前场景未声明工具或无需调用工具。"
lines = ["工具结果:"]
for index, tool_call in enumerate(tool_calls, start=1):
if tool_call.get("success"):
lines.append(
f"{index}. 工具={tool_call.get('tool_name')} 结果={json.dumps(tool_call.get('result', {}), ensure_ascii=False)}"
)
else:
lines.append(
f"{index}. 工具={tool_call.get('tool_name')} 失败={tool_call.get('error', '未知错误')}"
)
return "\n".join(lines)

View File

@@ -1,8 +1,142 @@
def build_mock_structured_output(output_type: str, user_input: str, references: list) -> dict: import json
from .schemas.outputs import SUPPORTED_OUTPUT_TYPES
# 按输出类型声明页面和审计日志真正需要消费的结构化字段。
# 这里不追求复杂 schema 框架,优先保证字段稳定、可读、易讲解。
OUTPUT_FIELD_TEMPLATES = {
"general_answer": {
"answer": "",
"confidence": "medium",
"references": [],
},
"document_review_report": {
"summary": "",
"issues": [],
"risk_level": "medium",
"suggestions": [],
"missing_items": [],
"references": [],
},
"ticket_response": {
"reply": "",
"category": "general",
"priority": "medium",
"suggested_action": "",
"need_human_review": False,
},
"quality_report": {
"summary": "",
"possible_causes": [],
"evidence": [],
"risk_level": "medium",
"suggested_actions": [],
"references": [],
},
"risk_audit_report": {
"summary": "",
"risk_points": [],
"risk_level": "medium",
"suggestions": [],
"references": [],
},
}
def build_response_schema_hint(output_type: str) -> dict:
"""返回给 LLM 的结构化提示,帮助模型尽量输出稳定 JSON。"""
normalized_output_type = normalize_output_type(output_type)
return { return {
"output_type": output_type, "output_type": normalized_output_type,
"summary": f"模拟结构化输出:{user_input}", "fields": list(OUTPUT_FIELD_TEMPLATES[normalized_output_type].keys()),
"references_count": len(references),
"risk_level": "low",
"suggested_actions": ["补充真实 LLM Provider 后替换模拟结果"],
} }
def normalize_output_type(output_type: str) -> str:
"""对外部配置做轻量归一化,避免拼写差异导致解析分支混乱。"""
if output_type in SUPPORTED_OUTPUT_TYPES:
return output_type
return "general_answer"
def parse_structured_output(raw_content: str, output_type: str) -> tuple[dict, str]:
"""
优先将模型输出解析为 JSON。
返回值:
- structured_output: 页面和审计日志可直接消费的标准结构
- parse_mode: `json` 表示成功解析,`fallback` 表示降级处理
"""
normalized_output_type = normalize_output_type(output_type)
parsed = _try_parse_json_object(raw_content)
if parsed is None:
return build_fallback_structured_output(
output_type=normalized_output_type,
raw_content=raw_content,
), "fallback"
template = {
"output_type": normalized_output_type,
"parse_mode": "json",
}
template.update(OUTPUT_FIELD_TEMPLATES[normalized_output_type])
template.update(parsed)
return template, "json"
def build_fallback_structured_output(output_type: str, raw_content: str) -> dict:
"""当模型没有输出合法 JSON 时,仍然构造一个稳定的展示结构。"""
normalized_output_type = normalize_output_type(output_type)
structured_output = {
"output_type": normalized_output_type,
"parse_mode": "fallback",
}
structured_output.update(OUTPUT_FIELD_TEMPLATES[normalized_output_type])
if normalized_output_type == "general_answer":
structured_output["answer"] = raw_content
return structured_output
if normalized_output_type == "document_review_report":
structured_output["summary"] = raw_content
return structured_output
if normalized_output_type == "ticket_response":
structured_output["reply"] = raw_content
return structured_output
if normalized_output_type == "quality_report":
structured_output["summary"] = raw_content
return structured_output
structured_output["summary"] = raw_content
return structured_output
def extract_answer_from_structured_output(structured_output: dict, raw_content: str) -> str:
"""从结构化结果里提取页面主回答,保证不同输出类型有统一入口。"""
for field_name in ("answer", "reply", "summary"):
value = structured_output.get(field_name)
if isinstance(value, str) and value.strip():
return value.strip()
return raw_content.strip()
def _try_parse_json_object(raw_content: str) -> dict | None:
"""支持纯 JSON 或被 Markdown 代码块包裹的 JSON。"""
content = raw_content.strip()
if not content:
return None
candidates = [content]
if content.startswith("```"):
stripped = content.strip("`").strip()
if stripped.lower().startswith("json"):
stripped = stripped[4:].strip()
candidates.append(stripped)
for candidate in candidates:
try:
parsed = json.loads(candidate)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
return None

14
tests/conftest.py Normal file
View File

@@ -0,0 +1,14 @@
import pytest
@pytest.fixture(autouse=True)
def force_mock_llm_provider_for_tests(monkeypatch):
"""
测试环境固定使用 mock Provider。
当前项目会从根目录 `.env` 自动读取真实模型配置,这对本地运行很有帮助,
但单元测试和页面回归测试不应该依赖外部网络或真实密钥状态。
因此这里统一覆盖为 mock保证测试稳定、可重复。
"""
monkeypatch.setenv("LLM_PROVIDER", "mock")
monkeypatch.setenv("LLM_MODEL", "mock-model")

View File

@@ -1,24 +1,125 @@
from agent_core.orchestrator import run_agent from agent_core.orchestrator import build_messages, run_agent
from agent_core.rag.ingest import ingest_document from agent_core.rag.ingest import ingest_document
from agent_core.rag.retriever import retrieve from agent_core.rag.retriever import retrieve
def test_run_agent_returns_structured_mock_result(): def test_run_agent_returns_structured_result_from_llm_output():
scenario = { scenario = {
"id": "knowledge_qa", "id": "knowledge_qa",
"name": "知识库问答助手", "name": "知识库问答助手",
"agent": {
"role": "知识库助手",
"goal": "基于资料回答问题",
"instructions": ["仅根据证据回答"],
},
"rag": {"enabled": True, "collection": "knowledge_qa", "top_k": 3}, "rag": {"enabled": True, "collection": "knowledge_qa", "top_k": 3},
"tools": ["generate_action_items"], "tools": ["generate_action_items"],
"output": {"type": "general_answer"}, "output": {"type": "general_answer"},
} }
provider_response = """
{
"answer": "请先隔离异常现场,再通知负责人。",
"confidence": "high",
"references": [
{"source": "sop.md", "excerpt": "异常处理 SOP先隔离现场"}
]
}
"""
result = run_agent(scenario, "如何处理异常?") class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(
content=provider_response,
model_name="demo-model",
success=True,
)
result = run_agent(
scenario,
"如何处理异常?",
options={"llm_provider": FakeProvider()},
)
assert result.status == "success" assert result.status == "success"
assert result.answer assert result.answer == "请先隔离异常现场,再通知负责人。"
assert result.structured_output["output_type"] == "general_answer" assert result.structured_output["output_type"] == "general_answer"
assert result.structured_output["confidence"] == "high"
assert isinstance(result.references, list) assert isinstance(result.references, list)
assert result.tool_calls[0]["tool_name"] == "generate_action_items" assert result.tool_calls[0]["tool_name"] == "generate_action_items"
assert result.model_name == "demo-model"
def test_run_agent_falls_back_when_llm_returns_non_json():
scenario = {
"id": "document_review",
"name": "文档审核助手",
"agent": {
"role": "审核助手",
"goal": "总结审核意见",
"instructions": ["输出重点问题"],
},
"rag": {"enabled": False},
"tools": [],
"output": {"type": "document_review_report"},
}
class FakeProvider:
def generate(self, messages, response_format=None):
from agent_core.llm_provider import LLMResponse
return LLMResponse(
content="这是非 JSON 的普通回答",
model_name="demo-model",
success=True,
)
result = run_agent(
scenario,
"请检查合同风险",
options={"llm_provider": FakeProvider()},
)
assert result.status == "success"
assert result.answer == "这是非 JSON 的普通回答"
assert result.structured_output["output_type"] == "document_review_report"
assert result.structured_output["summary"] == "这是非 JSON 的普通回答"
assert result.structured_output["parse_mode"] == "fallback"
def test_build_messages_contains_role_goal_references_and_tool_results():
scenario = {
"name": "质量异常分析助手",
"agent": {
"role": "质量管理专家",
"goal": "生成结构化质量分析报告",
"instructions": ["必须引用知识库", "缺失信息要说明"],
},
"output": {"type": "quality_report"},
}
messages = build_messages(
scenario_config=scenario,
user_input="分析 A 线异常",
references=[{"source": "sop.md", "content": "先隔离现场"}],
tool_calls=[
{
"tool_name": "query_demo_records",
"success": True,
"result": {"records": [{"title": "A线缺陷"}]},
"error": "",
}
],
)
assert messages[0]["role"] == "system"
assert "质量管理专家" in messages[0]["content"]
assert "生成结构化质量分析报告" in messages[0]["content"]
assert "quality_report" in messages[0]["content"]
assert "先隔离现场" in messages[1]["content"]
assert "A线缺陷" in messages[1]["content"]
assert "分析 A 线异常" in messages[2]["content"]
def test_rag_ingest_and_retrieve_filters_by_scenario_and_query(tmp_path): def test_rag_ingest_and_retrieve_filters_by_scenario_and_query(tmp_path):