diff --git a/agent_core/llm_provider.py b/agent_core/llm_provider.py index 1bb568d..a662b9e 100644 --- a/agent_core/llm_provider.py +++ b/agent_core/llm_provider.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import json +import os from urllib.error import URLError from urllib.request import Request, urlopen @@ -25,13 +26,22 @@ class MockLLMProvider: self.model_name = model_name or "mock-model" def generate(self, messages: list[dict], response_format: dict | None = None) -> LLMResponse: + # Mock Provider 的职责是让页面和测试在未接入真实模型时也能闭环。 + # 因此这里直接返回稳定 JSON,方便后续统一走结构化解析逻辑。 user_content = "" for message in reversed(messages): if message.get("role") == "user": user_content = message.get("content", "") break return LLMResponse( - content=f"模拟模型回答:{user_content}", + content=json.dumps( + { + "answer": f"模拟回答:{user_content}", + "confidence": "medium", + "references": [], + }, + ensure_ascii=False, + ), model_name=self.model_name, success=True, ) @@ -112,7 +122,9 @@ def _post_json(base_url: str, endpoint: str, api_key: str, payload: dict) -> dic def create_llm_provider(config: dict | None = None): config = config or {} - provider_name = config.get("LLM_PROVIDER", "mock") + provider_name = config.get("LLM_PROVIDER") + if not provider_name: + provider_name = "openai_compatible" if config.get("LLM_API_KEY") else "mock" model_name = config.get("LLM_MODEL", "mock-model") if provider_name == "mock": return MockLLMProvider(model_name=model_name) @@ -130,3 +142,21 @@ def create_embedding_provider(config: dict | None = None): base_url=config.get("EMBEDDING_BASE_URL", config.get("LLM_BASE_URL", "https://api.openai.com/v1")), model_name=config.get("EMBEDDING_MODEL", "text-embedding-3-small"), ) + + +def get_runtime_llm_config(overrides: dict | None = None) -> dict: + """ + 从环境变量读取运行时配置。 + + Agent Core 通过这层读取模型配置,避免直接依赖 Django settings, + 这样本模块在独立脚本、测试和 Django 中都能复用。 + """ + config = { + "LLM_PROVIDER": os.environ.get("LLM_PROVIDER", ""), + "LLM_API_KEY": os.environ.get("LLM_API_KEY", ""), + "LLM_BASE_URL": os.environ.get("LLM_BASE_URL", "https://api.openai.com/v1"), + "LLM_MODEL": os.environ.get("LLM_MODEL", "mock-model"), + } + if overrides: + config.update(overrides) + return config diff --git a/agent_core/orchestrator.py b/agent_core/orchestrator.py index a8e6c23..287cdb8 100644 --- a/agent_core/orchestrator.py +++ b/agent_core/orchestrator.py @@ -1,40 +1,153 @@ +import json import time +from .llm_provider import create_llm_provider, get_runtime_llm_config from .results import AgentResult -from .structured_output import build_mock_structured_output +from .structured_output import ( + build_response_schema_hint, + extract_answer_from_structured_output, + parse_structured_output, +) from .tool_registry import run_declared_tools from .rag.retriever import retrieve def run_agent(scenario_config: dict, user_input: str, options: dict | None = None) -> AgentResult: + """ + 执行当前场景的最小 Agent 闭环。 + + 处理顺序保持和设计文档一致: + 1. 读取场景配置 + 2. 执行 RAG 检索 + 3. 执行声明式工具 + 4. 构造 Prompt 并调用 LLM + 5. 解析结构化结果 + 6. 统一返回 AgentResult + """ started_at = time.perf_counter() options = options or {} output_type = scenario_config.get("output", {}).get("type", "general_answer") - references = [] - rag_config = scenario_config.get("rag", {}) - if rag_config.get("enabled"): - references = retrieve( - scenario_id=scenario_config.get("id", ""), - query=user_input, - collection=rag_config.get("collection", scenario_config.get("id", "")), - top_k=rag_config.get("top_k", 5), - document_ids=options.get("document_ids"), - store_path=options.get("rag_store_path"), - ) - + references = _collect_references(scenario_config=scenario_config, user_input=user_input, options=options) tool_calls = run_declared_tools(scenario_config.get("tools", []), user_input) - structured_output = build_mock_structured_output(output_type, user_input, references) - answer = f"已根据「{scenario_config.get('name', '当前场景')}」生成模拟回答:{user_input}" + messages = build_messages( + scenario_config=scenario_config, + user_input=user_input, + references=references, + tool_calls=tool_calls, + ) + + provider = options.get("llm_provider") or create_llm_provider( + get_runtime_llm_config(options.get("llm_config")) + ) + llm_response = provider.generate( + messages, + response_format=build_response_schema_hint(output_type), + ) latency_ms = int((time.perf_counter() - started_at) * 1000) + if not llm_response.success: + return AgentResult( + answer="模型调用失败,请检查配置或稍后重试。", + structured_output={}, + references=references, + tool_calls=tool_calls, + raw_output="", + model_name=llm_response.model_name or "unknown-model", + latency_ms=latency_ms, + status="failed", + error=str(llm_response.error or "未知模型错误"), + ) + + structured_output, _ = parse_structured_output(llm_response.content, output_type) + answer = extract_answer_from_structured_output(structured_output, llm_response.content) return AgentResult( answer=answer, structured_output=structured_output, references=references, tool_calls=tool_calls, - raw_output=answer, - model_name=options.get("model_name", "mock-model"), + raw_output=llm_response.content, + model_name=llm_response.model_name or "unknown-model", latency_ms=latency_ms, status="success", ) + + +def build_messages( + scenario_config: dict, + user_input: str, + references: list[dict], + tool_calls: list[dict], +) -> list[dict]: + """将场景配置、检索结果和工具结果整合为最小可解释 Prompt。""" + agent_config = scenario_config.get("agent", {}) + system_message = "\n".join( + [ + f"你当前扮演的角色:{agent_config.get('role', '通用业务助手')}", + f"当前任务目标:{agent_config.get('goal', '根据输入生成结构化结果')}", + "执行要求:", + _format_instructions(agent_config.get("instructions", [])), + f"输出类型:{scenario_config.get('output', {}).get('type', 'general_answer')}", + "请优先输出 JSON 对象,字段必须贴近约定输出结构。", + ] + ) + context_message = "\n".join( + [ + f"当前场景:{scenario_config.get('name', '未命名场景')}", + _format_references(references), + _format_tool_calls(tool_calls), + ] + ) + return [ + {"role": "system", "content": system_message}, + {"role": "assistant", "content": context_message}, + {"role": "user", "content": user_input}, + ] + + +def _collect_references(scenario_config: dict, user_input: str, options: dict) -> list[dict]: + """按场景配置执行检索,并保持无 RAG 场景也能正常返回空列表。""" + rag_config = scenario_config.get("rag", {}) + if not rag_config.get("enabled"): + return [] + return retrieve( + scenario_id=scenario_config.get("id", ""), + query=user_input, + collection=rag_config.get("collection", scenario_config.get("id", "")), + top_k=rag_config.get("top_k", 5), + document_ids=options.get("document_ids"), + store_path=options.get("rag_store_path"), + ) + + +def _format_instructions(instructions: list[str]) -> str: + if not instructions: + return "1. 结合知识库和工具结果回答。\n2. 信息不足时明确说明。" + return "\n".join(f"{index}. {item}" for index, item in enumerate(instructions, start=1)) + + +def _format_references(references: list[dict]) -> str: + if not references: + return "知识库引用:当前没有检索到可用片段。" + lines = ["知识库引用:"] + for index, reference in enumerate(references, start=1): + lines.append( + f"{index}. 来源={reference.get('source', '未知来源')} 内容={reference.get('content', '')}" + ) + return "\n".join(lines) + + +def _format_tool_calls(tool_calls: list[dict]) -> str: + if not tool_calls: + return "工具结果:当前场景未声明工具或无需调用工具。" + lines = ["工具结果:"] + for index, tool_call in enumerate(tool_calls, start=1): + if tool_call.get("success"): + lines.append( + f"{index}. 工具={tool_call.get('tool_name')} 结果={json.dumps(tool_call.get('result', {}), ensure_ascii=False)}" + ) + else: + lines.append( + f"{index}. 工具={tool_call.get('tool_name')} 失败={tool_call.get('error', '未知错误')}" + ) + return "\n".join(lines) diff --git a/agent_core/structured_output.py b/agent_core/structured_output.py index 1214a0d..98a16c3 100644 --- a/agent_core/structured_output.py +++ b/agent_core/structured_output.py @@ -1,8 +1,142 @@ -def build_mock_structured_output(output_type: str, user_input: str, references: list) -> dict: +import json + +from .schemas.outputs import SUPPORTED_OUTPUT_TYPES + + +# 按输出类型声明页面和审计日志真正需要消费的结构化字段。 +# 这里不追求复杂 schema 框架,优先保证字段稳定、可读、易讲解。 +OUTPUT_FIELD_TEMPLATES = { + "general_answer": { + "answer": "", + "confidence": "medium", + "references": [], + }, + "document_review_report": { + "summary": "", + "issues": [], + "risk_level": "medium", + "suggestions": [], + "missing_items": [], + "references": [], + }, + "ticket_response": { + "reply": "", + "category": "general", + "priority": "medium", + "suggested_action": "", + "need_human_review": False, + }, + "quality_report": { + "summary": "", + "possible_causes": [], + "evidence": [], + "risk_level": "medium", + "suggested_actions": [], + "references": [], + }, + "risk_audit_report": { + "summary": "", + "risk_points": [], + "risk_level": "medium", + "suggestions": [], + "references": [], + }, +} + + +def build_response_schema_hint(output_type: str) -> dict: + """返回给 LLM 的结构化提示,帮助模型尽量输出稳定 JSON。""" + normalized_output_type = normalize_output_type(output_type) return { - "output_type": output_type, - "summary": f"模拟结构化输出:{user_input}", - "references_count": len(references), - "risk_level": "low", - "suggested_actions": ["补充真实 LLM Provider 后替换模拟结果"], + "output_type": normalized_output_type, + "fields": list(OUTPUT_FIELD_TEMPLATES[normalized_output_type].keys()), } + + +def normalize_output_type(output_type: str) -> str: + """对外部配置做轻量归一化,避免拼写差异导致解析分支混乱。""" + if output_type in SUPPORTED_OUTPUT_TYPES: + return output_type + return "general_answer" + + +def parse_structured_output(raw_content: str, output_type: str) -> tuple[dict, str]: + """ + 优先将模型输出解析为 JSON。 + + 返回值: + - structured_output: 页面和审计日志可直接消费的标准结构 + - parse_mode: `json` 表示成功解析,`fallback` 表示降级处理 + """ + normalized_output_type = normalize_output_type(output_type) + parsed = _try_parse_json_object(raw_content) + if parsed is None: + return build_fallback_structured_output( + output_type=normalized_output_type, + raw_content=raw_content, + ), "fallback" + + template = { + "output_type": normalized_output_type, + "parse_mode": "json", + } + template.update(OUTPUT_FIELD_TEMPLATES[normalized_output_type]) + template.update(parsed) + return template, "json" + + +def build_fallback_structured_output(output_type: str, raw_content: str) -> dict: + """当模型没有输出合法 JSON 时,仍然构造一个稳定的展示结构。""" + normalized_output_type = normalize_output_type(output_type) + structured_output = { + "output_type": normalized_output_type, + "parse_mode": "fallback", + } + structured_output.update(OUTPUT_FIELD_TEMPLATES[normalized_output_type]) + + if normalized_output_type == "general_answer": + structured_output["answer"] = raw_content + return structured_output + if normalized_output_type == "document_review_report": + structured_output["summary"] = raw_content + return structured_output + if normalized_output_type == "ticket_response": + structured_output["reply"] = raw_content + return structured_output + if normalized_output_type == "quality_report": + structured_output["summary"] = raw_content + return structured_output + + structured_output["summary"] = raw_content + return structured_output + + +def extract_answer_from_structured_output(structured_output: dict, raw_content: str) -> str: + """从结构化结果里提取页面主回答,保证不同输出类型有统一入口。""" + for field_name in ("answer", "reply", "summary"): + value = structured_output.get(field_name) + if isinstance(value, str) and value.strip(): + return value.strip() + return raw_content.strip() + + +def _try_parse_json_object(raw_content: str) -> dict | None: + """支持纯 JSON 或被 Markdown 代码块包裹的 JSON。""" + content = raw_content.strip() + if not content: + return None + candidates = [content] + if content.startswith("```"): + stripped = content.strip("`").strip() + if stripped.lower().startswith("json"): + stripped = stripped[4:].strip() + candidates.append(stripped) + + for candidate in candidates: + try: + parsed = json.loads(candidate) + except json.JSONDecodeError: + continue + if isinstance(parsed, dict): + return parsed + return None diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..85c9ccf --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,14 @@ +import pytest + + +@pytest.fixture(autouse=True) +def force_mock_llm_provider_for_tests(monkeypatch): + """ + 测试环境固定使用 mock Provider。 + + 当前项目会从根目录 `.env` 自动读取真实模型配置,这对本地运行很有帮助, + 但单元测试和页面回归测试不应该依赖外部网络或真实密钥状态。 + 因此这里统一覆盖为 mock,保证测试稳定、可重复。 + """ + monkeypatch.setenv("LLM_PROVIDER", "mock") + monkeypatch.setenv("LLM_MODEL", "mock-model") diff --git a/tests/test_agent_core.py b/tests/test_agent_core.py index 579a089..277df9f 100644 --- a/tests/test_agent_core.py +++ b/tests/test_agent_core.py @@ -1,24 +1,125 @@ -from agent_core.orchestrator import run_agent +from agent_core.orchestrator import build_messages, run_agent from agent_core.rag.ingest import ingest_document from agent_core.rag.retriever import retrieve -def test_run_agent_returns_structured_mock_result(): +def test_run_agent_returns_structured_result_from_llm_output(): scenario = { "id": "knowledge_qa", "name": "知识库问答助手", + "agent": { + "role": "知识库助手", + "goal": "基于资料回答问题", + "instructions": ["仅根据证据回答"], + }, "rag": {"enabled": True, "collection": "knowledge_qa", "top_k": 3}, "tools": ["generate_action_items"], "output": {"type": "general_answer"}, } + provider_response = """ + { + "answer": "请先隔离异常现场,再通知负责人。", + "confidence": "high", + "references": [ + {"source": "sop.md", "excerpt": "异常处理 SOP:先隔离现场"} + ] + } + """ - result = run_agent(scenario, "如何处理异常?") + class FakeProvider: + def generate(self, messages, response_format=None): + from agent_core.llm_provider import LLMResponse + + return LLMResponse( + content=provider_response, + model_name="demo-model", + success=True, + ) + + result = run_agent( + scenario, + "如何处理异常?", + options={"llm_provider": FakeProvider()}, + ) assert result.status == "success" - assert result.answer + assert result.answer == "请先隔离异常现场,再通知负责人。" assert result.structured_output["output_type"] == "general_answer" + assert result.structured_output["confidence"] == "high" assert isinstance(result.references, list) assert result.tool_calls[0]["tool_name"] == "generate_action_items" + assert result.model_name == "demo-model" + + +def test_run_agent_falls_back_when_llm_returns_non_json(): + scenario = { + "id": "document_review", + "name": "文档审核助手", + "agent": { + "role": "审核助手", + "goal": "总结审核意见", + "instructions": ["输出重点问题"], + }, + "rag": {"enabled": False}, + "tools": [], + "output": {"type": "document_review_report"}, + } + + class FakeProvider: + def generate(self, messages, response_format=None): + from agent_core.llm_provider import LLMResponse + + return LLMResponse( + content="这是非 JSON 的普通回答", + model_name="demo-model", + success=True, + ) + + result = run_agent( + scenario, + "请检查合同风险", + options={"llm_provider": FakeProvider()}, + ) + + assert result.status == "success" + assert result.answer == "这是非 JSON 的普通回答" + assert result.structured_output["output_type"] == "document_review_report" + assert result.structured_output["summary"] == "这是非 JSON 的普通回答" + assert result.structured_output["parse_mode"] == "fallback" + + +def test_build_messages_contains_role_goal_references_and_tool_results(): + scenario = { + "name": "质量异常分析助手", + "agent": { + "role": "质量管理专家", + "goal": "生成结构化质量分析报告", + "instructions": ["必须引用知识库", "缺失信息要说明"], + }, + "output": {"type": "quality_report"}, + } + + messages = build_messages( + scenario_config=scenario, + user_input="分析 A 线异常", + references=[{"source": "sop.md", "content": "先隔离现场"}], + tool_calls=[ + { + "tool_name": "query_demo_records", + "success": True, + "result": {"records": [{"title": "A线缺陷"}]}, + "error": "", + } + ], + ) + + assert messages[0]["role"] == "system" + assert "质量管理专家" in messages[0]["content"] + assert "生成结构化质量分析报告" in messages[0]["content"] + assert "quality_report" in messages[0]["content"] + assert "先隔离现场" in messages[1]["content"] + assert "A线缺陷" in messages[1]["content"] + assert "分析 A 线异常" in messages[2]["content"] def test_rag_ingest_and_retrieve_filters_by_scenario_and_query(tmp_path):