尝试添加工具历史记录

pull/1185/head
Windpicker-owo 2025-08-18 00:52:17 +08:00
parent 794a0d8fd4
commit 17d8bac504
3 changed files with 317 additions and 1 deletions

4
bot.py
View File

@ -24,6 +24,10 @@ initialize_logging()
from src.main import MainSystem #noqa
from src.manager.async_task_manager import async_task_manager #noqa
from src.common.tool_history import wrap_tool_executor #noqa
# 初始化工具历史记录
wrap_tool_executor()

View File

@ -7,11 +7,28 @@ from contextlib import asynccontextmanager
from typing import Dict, Any, Optional, List, Union
from src.common.logger import get_logger
from src.common.tool_history import ToolHistoryManager
install(extra_lines=3)
logger = get_logger("prompt_build")
# 创建工具历史管理器实例
tool_history_manager = ToolHistoryManager()
def get_tool_history_prompt(message_id: Optional[str] = None) -> str:
"""获取工具历史提示词
Args:
message_id: 会话ID, 用于只获取当前会话的历史
Returns:
格式化的工具历史提示词
"""
return tool_history_manager.get_recent_history_prompt(
session_id=message_id
)
class PromptContext:
def __init__(self):
@ -136,8 +153,23 @@ class PromptManager:
return prompt
async def format_prompt(self, name: str, **kwargs) -> str:
# 获取当前提示词
prompt = await self.get_prompt_async(name)
return prompt.format(**kwargs)
# 获取当前会话ID
message_id = self._context._current_context
# 获取工具历史提示词
tool_history = ""
if name in ['action_prompt', 'replyer_prompt', 'planner_prompt', 'tool_executor_prompt']:
tool_history = get_tool_history_prompt(message_id)
# 如果有工具历史,添加到提示词末尾
result = prompt.format(**kwargs)
if tool_history:
result = f"{result}\n\n{tool_history}"
return result
# 全局单例

View File

@ -0,0 +1,280 @@
"""工具执行历史记录模块"""
import functools
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
import json
from pathlib import Path
import asyncio
from .logger import get_logger
logger = get_logger("tool_history")
class ToolHistoryManager:
"""工具执行历史记录管理器"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self._history: List[Dict[str, Any]] = []
self._initialized = True
self._data_dir = Path("data/tool_history")
self._data_dir.mkdir(parents=True, exist_ok=True)
self._current_file = None
self._load_history()
self._rotate_file()
def _rotate_file(self):
"""轮换历史记录文件"""
current_time = datetime.now()
filename = f"tool_history_{current_time.strftime('%Y%m%d_%H%M%S')}.jsonl"
self._current_file = self._data_dir / filename
def _save_record(self, record: Dict[str, Any]):
"""保存单条记录到文件"""
try:
with self._current_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def record_tool_call(self,
tool_name: str,
args: Dict[str, Any],
result: Any,
execution_time: float,
status: str,
session_id: Optional[str] = None):
"""记录工具调用"""
try:
# 创建记录
record = {
"tool_name": tool_name,
"timestamp": datetime.now().isoformat(),
"arguments": self._sanitize_args(args),
"result": self._sanitize_result(result),
"execution_time": execution_time,
"status": status,
"session_id": session_id
}
# 添加到内存中的历史记录
self._history.append(record)
# 保存到文件
self._save_record(record)
if status == "completed":
logger.info(f"工具 {tool_name} 调用完成,耗时:{execution_time:.2f}s")
else:
logger.error(f"工具 {tool_name} 调用失败:{result}")
except Exception as e:
logger.error(f"记录工具调用时发生错误: {e}")
def _sanitize_args(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""清理参数中的敏感信息"""
sensitive_keys = ['api_key', 'token', 'password', 'secret']
sanitized = args.copy()
def _sanitize_value(value):
if isinstance(value, dict):
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in value.items()}
return value
return {k: '***' if k.lower() in sensitive_keys else _sanitize_value(v)
for k, v in sanitized.items()}
def _sanitize_result(self, result: Any) -> Any:
"""清理结果中的敏感信息"""
if isinstance(result, dict):
return self._sanitize_args(result)
return result
def _load_history(self):
"""加载历史记录文件"""
try:
# 按文件修改时间排序,加载最近的文件
history_files = sorted(
self._data_dir.glob("tool_history_*.jsonl"),
key=lambda x: x.stat().st_mtime,
reverse=True
)
# 最多加载最近3个文件的历史
for file in history_files[:3]:
try:
with file.open("r", encoding="utf-8") as f:
for line in f:
record = json.loads(line)
self._history.append(record)
except Exception as e:
logger.error(f"加载历史记录文件 {file} 失败: {e}")
logger.info(f"成功加载了 {len(self._history)} 条历史记录")
except Exception as e:
logger.error(f"加载历史记录失败: {e}")
def query_history(self,
tool_names: Optional[List[str]] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
session_id: Optional[str] = None,
limit: Optional[int] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""
查询工具调用历史
Args:
tool_names: 工具名称列表为空则查询所有工具
start_time: 开始时间可以是datetime对象或ISO格式字符串
end_time: 结束时间可以是datetime对象或ISO格式字符串
session_id: 会话ID用于筛选特定会话的调用
limit: 返回记录数量限制
status: 执行状态筛选("completed""error")
Returns:
符合条件的历史记录列表
"""
def _parse_time(time_str: Optional[Union[datetime, str]]) -> Optional[datetime]:
if isinstance(time_str, datetime):
return time_str
elif isinstance(time_str, str):
return datetime.fromisoformat(time_str)
return None
filtered_history = self._history
# 按工具名筛选
if tool_names:
filtered_history = [
record for record in filtered_history
if record["tool_name"] in tool_names
]
# 按时间范围筛选
start_dt = _parse_time(start_time)
end_dt = _parse_time(end_time)
if start_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) >= start_dt
]
if end_dt:
filtered_history = [
record for record in filtered_history
if datetime.fromisoformat(record["timestamp"]) <= end_dt
]
# 按会话ID筛选
if session_id:
filtered_history = [
record for record in filtered_history
if record.get("session_id") == session_id
]
# 按状态筛选
if status:
filtered_history = [
record for record in filtered_history
if record["status"] == status
]
# 应用数量限制
if limit:
filtered_history = filtered_history[-limit:]
return filtered_history
def get_recent_history_prompt(self,
limit: int = 5,
session_id: Optional[str] = None) -> str:
"""
获取最近工具调用历史的提示词
Args:
limit: 返回的历史记录数量
session_id: 会话ID用于只获取当前会话的历史
Returns:
格式化的历史记录提示词
"""
recent_history = self.query_history(
session_id=session_id,
limit=limit
)
if not recent_history:
return ""
prompt = "\n最近的工具调用历史:\n"
for record in recent_history:
status = "成功" if record["status"] == "completed" else "失败"
timestamp = datetime.fromisoformat(record["timestamp"]).strftime("%H:%M:%S")
prompt += (
f"- [{timestamp}] {record['tool_name']} ({status})\n"
f" 参数: {json.dumps(record['arguments'], ensure_ascii=False)}\n"
f" 结果: {str(record['result'])[:200]}...\n"
)
return prompt
def clear_history(self):
"""清除历史记录"""
self._history.clear()
self._rotate_file()
logger.info("工具调用历史记录已清除")
def wrap_tool_executor():
"""
包装工具执行器以添加历史记录功能
这个函数应该在系统启动时被调用一次
"""
from src.plugin_system.core.tool_use import ToolExecutor
original_execute = ToolExecutor.execute_tool_call
history_manager = ToolHistoryManager()
async def wrapped_execute_tool_call(self, tool_call, tool_instance=None):
start_time = time.time()
try:
result = await original_execute(self, tool_call, tool_instance)
execution_time = time.time() - start_time
# 记录成功的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.arguments,
result=result,
execution_time=execution_time,
status="completed",
session_id=getattr(self, 'session_id', None)
)
return result
except Exception as e:
execution_time = time.time() - start_time
# 记录失败的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
args=tool_call.arguments,
result=str(e),
execution_time=execution_time,
status="error",
session_id=getattr(self, 'session_id', None)
)
raise
# 替换原始方法
ToolExecutor.execute_tool_call = wrapped_execute_tool_call