From 8d08571cdab292139cf0c292196a5f163d02f53c Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 18 Aug 2025 00:52:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E6=B7=BB=E5=8A=A0=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E5=8E=86=E5=8F=B2=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 4 + src/chat/utils/prompt_builder.py | 34 +++- src/common/tool_history.py | 280 +++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 src/common/tool_history.py diff --git a/bot.py b/bot.py index 5342be7c..ad2f4026 100644 --- a/bot.py +++ b/bot.py @@ -24,6 +24,10 @@ initialize_logging() from src.main import MainSystem #noqa from src.manager.async_task_manager import async_task_manager #noqa +from src.common.tool_history import wrap_tool_executor #noqa + +# 初始化工具历史记录 +wrap_tool_executor() diff --git a/src/chat/utils/prompt_builder.py b/src/chat/utils/prompt_builder.py index 1b107904..03f32f26 100644 --- a/src/chat/utils/prompt_builder.py +++ b/src/chat/utils/prompt_builder.py @@ -7,11 +7,28 @@ from contextlib import asynccontextmanager from typing import Dict, Any, Optional, List, Union from src.common.logger import get_logger +from src.common.tool_history import ToolHistoryManager install(extra_lines=3) logger = get_logger("prompt_build") +# 创建工具历史管理器实例 +tool_history_manager = ToolHistoryManager() + +def get_tool_history_prompt(message_id: Optional[str] = None) -> str: + """获取工具历史提示词 + + Args: + message_id: 会话ID, 用于只获取当前会话的历史 + + Returns: + 格式化的工具历史提示词 + """ + return tool_history_manager.get_recent_history_prompt( + session_id=message_id + ) + class PromptContext: def __init__(self): @@ -136,8 +153,23 @@ class PromptManager: return prompt async def format_prompt(self, name: str, **kwargs) -> str: + # 获取当前提示词 prompt = await self.get_prompt_async(name) - return prompt.format(**kwargs) + + # 获取当前会话ID + message_id = self._context._current_context + + # 获取工具历史提示词 + tool_history = "" + if name in ['action_prompt', 'replyer_prompt', 'planner_prompt', 'tool_executor_prompt']: + tool_history = get_tool_history_prompt(message_id) + + # 如果有工具历史,添加到提示词末尾 + result = prompt.format(**kwargs) + if tool_history: + result = f"{result}\n\n{tool_history}" + + return result # 全局单例 diff --git a/src/common/tool_history.py b/src/common/tool_history.py new file mode 100644 index 00000000..6ee6bb1a --- /dev/null +++ b/src/common/tool_history.py @@ -0,0 +1,280 @@ +"""工具执行历史记录模块""" +import functools +import time +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Union +import json +from pathlib import Path +import asyncio + +from .logger import get_logger + +logger = get_logger("tool_history") + +class ToolHistoryManager: + """工具执行历史记录管理器""" + + _instance = None + _initialized = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not self._initialized: + self._history: List[Dict[str, Any]] = [] + self._initialized = True + self._data_dir = Path("data/tool_history") + self._data_dir.mkdir(parents=True, exist_ok=True) + self._current_file = None + self._load_history() + self._rotate_file() + + def _rotate_file(self): + """轮换历史记录文件""" + current_time = datetime.now() + filename = f"tool_history_{current_time.strftime('%Y%m%d_%H%M%S')}.jsonl" + self._current_file = self._data_dir / filename + + def _save_record(self, record: Dict[str, Any]): + """保存单条记录到文件""" + try: + with self._current_file.open("a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + except Exception as e: + logger.error(f"保存工具调用记录失败: {e}") + + def record_tool_call(self, + tool_name: str, + args: Dict[str, Any], + result: Any, + execution_time: float, + status: str, + session_id: Optional[str] = None): + """记录工具调用""" + try: + # 创建记录 + record = { + "tool_name": tool_name, + "timestamp": datetime.now().isoformat(), + "arguments": self._sanitize_args(args), + "result": self._sanitize_result(result), + "execution_time": execution_time, + "status": status, + "session_id": session_id + } + + # 添加到内存中的历史记录 + self._history.append(record) + + # 保存到文件 + self._save_record(record) + + if status == "completed": + logger.info(f"工具 {tool_name} 调用完成,耗时:{execution_time:.2f}s") + else: + logger.error(f"工具 {tool_name} 调用失败:{result}") + + except Exception as e: + logger.error(f"记录工具调用时发生错误: {e}") + + def _sanitize_args(self, args: Dict[str, Any]) -> Dict[str, Any]: + """清理参数中的敏感信息""" + sensitive_keys = ['api_key', 'token', 'password', 'secret'] + sanitized = args.copy() + + def _sanitize_value(value): + if isinstance(value, dict): + return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v) + for k, v in value.items()} + return value + + return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v) + for k, v in sanitized.items()} + + def _sanitize_result(self, result: Any) -> Any: + """清理结果中的敏感信息""" + if isinstance(result, dict): + return self._sanitize_args(result) + return result + + def _load_history(self): + """加载历史记录文件""" + try: + # 按文件修改时间排序,加载最近的文件 + history_files = sorted( + self._data_dir.glob("tool_history_*.jsonl"), + key=lambda x: x.stat().st_mtime, + reverse=True + ) + + # 最多加载最近3个文件的历史 + for file in history_files[:3]: + try: + with file.open("r", encoding="utf-8") as f: + for line in f: + record = json.loads(line) + self._history.append(record) + except Exception as e: + logger.error(f"加载历史记录文件 {file} 失败: {e}") + + logger.info(f"成功加载了 {len(self._history)} 条历史记录") + except Exception as e: + logger.error(f"加载历史记录失败: {e}") + + def query_history(self, + tool_names: Optional[List[str]] = None, + start_time: Optional[Union[datetime, str]] = None, + end_time: Optional[Union[datetime, str]] = None, + session_id: Optional[str] = None, + limit: Optional[int] = None, + status: Optional[str] = None) -> List[Dict[str, Any]]: + """ + 查询工具调用历史 + + Args: + tool_names: 工具名称列表,为空则查询所有工具 + start_time: 开始时间,可以是datetime对象或ISO格式字符串 + end_time: 结束时间,可以是datetime对象或ISO格式字符串 + session_id: 会话ID,用于筛选特定会话的调用 + limit: 返回记录数量限制 + status: 执行状态筛选("completed"或"error") + + Returns: + 符合条件的历史记录列表 + """ + def _parse_time(time_str: Optional[Union[datetime, str]]) -> Optional[datetime]: + if isinstance(time_str, datetime): + return time_str + elif isinstance(time_str, str): + return datetime.fromisoformat(time_str) + return None + + filtered_history = self._history + + # 按工具名筛选 + if tool_names: + filtered_history = [ + record for record in filtered_history + if record["tool_name"] in tool_names + ] + + # 按时间范围筛选 + start_dt = _parse_time(start_time) + end_dt = _parse_time(end_time) + + if start_dt: + filtered_history = [ + record for record in filtered_history + if datetime.fromisoformat(record["timestamp"]) >= start_dt + ] + + if end_dt: + filtered_history = [ + record for record in filtered_history + if datetime.fromisoformat(record["timestamp"]) <= end_dt + ] + + # 按会话ID筛选 + if session_id: + filtered_history = [ + record for record in filtered_history + if record.get("session_id") == session_id + ] + + # 按状态筛选 + if status: + filtered_history = [ + record for record in filtered_history + if record["status"] == status + ] + + # 应用数量限制 + if limit: + filtered_history = filtered_history[-limit:] + + return filtered_history + + def get_recent_history_prompt(self, + limit: int = 5, + session_id: Optional[str] = None) -> str: + """ + 获取最近工具调用历史的提示词 + + Args: + limit: 返回的历史记录数量 + session_id: 会话ID,用于只获取当前会话的历史 + + Returns: + 格式化的历史记录提示词 + """ + recent_history = self.query_history( + session_id=session_id, + limit=limit + ) + + if not recent_history: + return "" + + prompt = "\n最近的工具调用历史:\n" + for record in recent_history: + status = "成功" if record["status"] == "completed" else "失败" + timestamp = datetime.fromisoformat(record["timestamp"]).strftime("%H:%M:%S") + prompt += ( + f"- [{timestamp}] {record['tool_name']} ({status})\n" + f" 参数: {json.dumps(record['arguments'], ensure_ascii=False)}\n" + f" 结果: {str(record['result'])[:200]}...\n" + ) + + return prompt + + def clear_history(self): + """清除历史记录""" + self._history.clear() + self._rotate_file() + logger.info("工具调用历史记录已清除") + +def wrap_tool_executor(): + """ + 包装工具执行器以添加历史记录功能 + 这个函数应该在系统启动时被调用一次 + """ + from src.plugin_system.core.tool_use import ToolExecutor + original_execute = ToolExecutor.execute_tool_call + history_manager = ToolHistoryManager() + + async def wrapped_execute_tool_call(self, tool_call, tool_instance=None): + start_time = time.time() + try: + result = await original_execute(self, tool_call, tool_instance) + execution_time = time.time() - start_time + + # 记录成功的调用 + history_manager.record_tool_call( + tool_name=tool_call.func_name, + args=tool_call.arguments, + result=result, + execution_time=execution_time, + status="completed", + session_id=getattr(self, 'session_id', None) + ) + + return result + + except Exception as e: + execution_time = time.time() - start_time + # 记录失败的调用 + history_manager.record_tool_call( + tool_name=tool_call.func_name, + args=tool_call.arguments, + result=str(e), + execution_time=execution_time, + status="error", + session_id=getattr(self, 'session_id', None) + ) + raise + + # 替换原始方法 + ToolExecutor.execute_tool_call = wrapped_execute_tool_call