移除了原先的工具缓存,工具增加ttl属性,历史文件合并为一个文件

pull/1186/head
Windpicker-owo 2025-08-19 11:30:47 +08:00
parent e51a72c294
commit 8cdb0238f8
3 changed files with 90 additions and 204 deletions

View File

@ -29,31 +29,39 @@ class ToolHistoryManager:
self._initialized = True
self._data_dir = Path("data/tool_history")
self._data_dir.mkdir(parents=True, exist_ok=True)
self._current_file = None
self._history_file = self._data_dir / "tool_history.jsonl"
self._load_history()
self._rotate_file()
def _rotate_file(self):
"""轮换历史记录文件"""
current_time = datetime.now()
filename = f"tool_history_{current_time.strftime('%Y%m%d_%H%M%S')}.jsonl"
self._current_file = self._data_dir / filename
def _save_history(self):
"""保存所有历史记录到文件"""
try:
with self._history_file.open("w", encoding="utf-8") as f:
for record in self._history:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _save_record(self, record: Dict[str, Any]):
"""保存单条记录到文件"""
try:
with self._current_file.open("a", encoding="utf-8") as f:
with self._history_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception as e:
logger.error(f"保存工具调用记录失败: {e}")
def _clean_expired_records(self):
"""清理已过期的记录"""
self._history = [record for record in self._history if record.get("ttl_count", 0) < record.get("ttl", 5)]
self._save_history()
def record_tool_call(self,
tool_name: str,
args: Dict[str, Any],
result: Any,
execution_time: float,
status: str,
chat_id: Optional[str] = None):
chat_id: Optional[str] = None,
ttl: int = 5):
"""记录工具调用
Args:
@ -63,9 +71,10 @@ class ToolHistoryManager:
execution_time: 执行时间
status: 执行状态("completed""error")
chat_id: 聊天ID与ChatManager中的chat_id对应用于标识群聊或私聊会话
ttl: 该记录的生命周期值插入提示词多少次后删除默认为5
"""
# 检查是否启用历史记录
if not global_config.tool.history.enable_history:
# 检查是否启用历史记录且ttl大于0
if not global_config.tool.history.enable_history or ttl <= 0:
return
try:
@ -77,7 +86,9 @@ class ToolHistoryManager:
"result": self._sanitize_result(result),
"execution_time": execution_time,
"status": status,
"chat_id": chat_id
"chat_id": chat_id,
"ttl": ttl,
"ttl_count": 0
}
# 添加到内存中的历史记录
@ -117,24 +128,17 @@ class ToolHistoryManager:
def _load_history(self):
"""加载历史记录文件"""
try:
# 按文件修改时间排序,加载最近的文件
history_files = sorted(
self._data_dir.glob("tool_history_*.jsonl"),
key=lambda x: x.stat().st_mtime,
reverse=True
)
# 最多加载最近3个文件的历史
for file in history_files[:3]:
try:
with file.open("r", encoding="utf-8") as f:
for line in f:
if self._history_file.exists():
self._history = []
with self._history_file.open("r", encoding="utf-8") as f:
for line in f:
try:
record = json.loads(line)
self._history.append(record)
except Exception as e:
logger.error(f"加载历史记录文件 {file} 失败: {e}")
logger.info(f"成功加载了 {len(self._history)} 条历史记录")
if record.get("ttl_count", 0) < record.get("ttl", 5): # 只加载未过期的记录
self._history.append(record)
except json.JSONDecodeError:
continue
logger.info(f"成功加载了 {len(self._history)} 条历史记录")
except Exception as e:
logger.error(f"加载历史记录失败: {e}")
@ -240,24 +244,39 @@ class ToolHistoryManager:
return ""
prompt = "\n工具执行历史:\n"
needs_save = False
updated_history = []
for record in recent_history:
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 增加ttl计数
record["ttl_count"] = record.get("ttl_count", 0) + 1
needs_save = True
# 格式化内容,去除多余空白和换行
content = content.strip().replace('\n', ' ')
# 如果未超过ttl则添加到提示词中
if record["ttl_count"] < record.get("ttl", 5):
# 提取结果中的name和content
result = record['result']
if isinstance(result, dict):
name = result.get('name', record['tool_name'])
content = result.get('content', str(result))
else:
name = record['tool_name']
content = str(result)
# 如果内容太长则截断
if len(content) > 200:
content = content[:200] + "..."
# 格式化内容,去除多余空白和换行
content = content.strip().replace('\n', ' ')
prompt += f"{name}: \n{content}\n\n"
# 如果内容太长则截断
if len(content) > 200:
content = content[:200] + "..."
prompt += f"{name}: \n{content}\n\n"
updated_history.append(record)
# 更新历史记录并保存
if needs_save:
self._history = updated_history
self._save_history()
return prompt
@ -282,6 +301,9 @@ def wrap_tool_executor():
result = await original_execute(self, tool_call, tool_instance)
execution_time = time.time() - start_time
# 获取工具的ttl值
ttl = getattr(tool_instance, 'history_ttl', 5) if tool_instance else 5
# 记录成功的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
@ -289,13 +311,17 @@ def wrap_tool_executor():
result=result,
execution_time=execution_time,
status="completed",
chat_id=getattr(self, 'chat_id', None)
chat_id=getattr(self, 'chat_id', None),
ttl=ttl
)
return result
except Exception as e:
execution_time = time.time() - start_time
# 获取工具的ttl值
ttl = getattr(tool_instance, 'history_ttl', 5) if tool_instance else 5
# 记录失败的调用
history_manager.record_tool_call(
tool_name=tool_call.func_name,
@ -303,7 +329,8 @@ def wrap_tool_executor():
result=str(e),
execution_time=execution_time,
status="error",
chat_id=getattr(self, 'chat_id', None)
chat_id=getattr(self, 'chat_id', None),
ttl=ttl
)
raise

View File

@ -29,6 +29,9 @@ class BaseTool(ABC):
available_for_llm: bool = False
"""是否可供LLM使用"""
history_ttl: int = 5
"""工具调用历史记录的TTL值默认为5。设为0表示不记录历史"""
def __init__(self, plugin_config: Optional[dict] = None):
self.plugin_config = plugin_config or {} # 直接存储插件配置字典

View File

@ -40,13 +40,11 @@ class ToolExecutor:
可以直接输入聊天消息内容自动判断并执行相应的工具返回结构化的工具执行结果
"""
def __init__(self, chat_id: str, enable_cache: bool = True, cache_ttl: int = 3):
def __init__(self, chat_id: str):
"""初始化工具执行器
Args:
executor_id: 执行器标识符用于日志记录
enable_cache: 是否启用缓存机制
cache_ttl: 缓存生存时间周期数
chat_id: 聊天标识符用于日志记录
"""
self.chat_id = chat_id
self.chat_stream = get_chat_manager().get_stream(self.chat_id)
@ -54,12 +52,7 @@ class ToolExecutor:
self.llm_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="tool_executor")
# 缓存配置
self.enable_cache = enable_cache
self.cache_ttl = cache_ttl
self.tool_cache = {} # 格式: {cache_key: {"result": result, "ttl": ttl, "timestamp": timestamp}}
logger.info(f"{self.log_prefix}工具执行器初始化完成,缓存{'启用' if enable_cache else '禁用'}TTL={cache_ttl}")
logger.info(f"{self.log_prefix}工具执行器初始化完成")
async def execute_from_chat_message(
self, target_message: str, chat_history: str, sender: str, return_details: bool = False
@ -77,18 +70,7 @@ class ToolExecutor:
如果return_details为True: Tuple[List[Dict], List[str], str] - (结果列表, 使用的工具, 提示词)
"""
# 首先检查缓存
cache_key = self._generate_cache_key(target_message, chat_history, sender)
if cached_result := self._get_from_cache(cache_key):
logger.info(f"{self.log_prefix}使用缓存结果,跳过工具执行")
if not return_details:
return cached_result, [], ""
# 从缓存结果中提取工具名称
used_tools = [result.get("tool_name", "unknown") for result in cached_result]
return cached_result, used_tools, ""
# 缓存未命中,执行工具调用
# 执行工具调用
# 获取可用工具
tools = self._get_tool_definitions()
@ -117,10 +99,6 @@ class ToolExecutor:
# 执行工具调用
tool_results, used_tools = await self.execute_tool_calls(tool_calls)
# 缓存结果
if tool_results:
self._set_cache(cache_key, tool_results)
if used_tools:
logger.info(f"{self.log_prefix}工具执行完成,共执行{len(used_tools)}个工具: {used_tools}")
@ -244,72 +222,8 @@ class ToolExecutor:
logger.error(f"执行工具调用时发生错误: {str(e)}")
raise e
def _generate_cache_key(self, target_message: str, chat_history: str, sender: str) -> str:
"""生成缓存键
Args:
target_message: 目标消息内容
chat_history: 聊天历史
sender: 发送者
Returns:
str: 缓存键
"""
import hashlib
# 使用消息内容和群聊状态生成唯一缓存键
content = f"{target_message}_{chat_history}_{sender}"
return hashlib.md5(content.encode()).hexdigest()
def _get_from_cache(self, cache_key: str) -> Optional[List[Dict]]:
"""从缓存获取结果
Args:
cache_key: 缓存键
Returns:
Optional[List[Dict]]: 缓存的结果如果不存在或过期则返回None
"""
if not self.enable_cache or cache_key not in self.tool_cache:
return None
cache_item = self.tool_cache[cache_key]
if cache_item["ttl"] <= 0:
# 缓存过期,删除
del self.tool_cache[cache_key]
logger.debug(f"{self.log_prefix}缓存过期,删除缓存键: {cache_key}")
return None
# 减少TTL
cache_item["ttl"] -= 1
logger.debug(f"{self.log_prefix}使用缓存结果剩余TTL: {cache_item['ttl']}")
return cache_item["result"]
def _set_cache(self, cache_key: str, result: List[Dict]):
"""设置缓存
Args:
cache_key: 缓存键
result: 要缓存的结果
"""
if not self.enable_cache:
return
self.tool_cache[cache_key] = {"result": result, "ttl": self.cache_ttl, "timestamp": time.time()}
logger.debug(f"{self.log_prefix}设置缓存TTL: {self.cache_ttl}")
def _cleanup_expired_cache(self):
"""清理过期的缓存"""
if not self.enable_cache:
return
expired_keys = []
expired_keys.extend(cache_key for cache_key, cache_item in self.tool_cache.items() if cache_item["ttl"] <= 0)
for key in expired_keys:
del self.tool_cache[key]
if expired_keys:
logger.debug(f"{self.log_prefix}清理了{len(expired_keys)}个过期缓存")
async def execute_specific_tool_simple(self, tool_name: str, tool_args: Dict) -> Optional[Dict]:
"""直接执行指定工具
@ -349,86 +263,28 @@ class ToolExecutor:
return None
def clear_cache(self):
"""清空所有缓存"""
if self.enable_cache:
cache_count = len(self.tool_cache)
self.tool_cache.clear()
logger.info(f"{self.log_prefix}清空了{cache_count}个缓存项")
def get_cache_status(self) -> Dict:
"""获取缓存状态信息
Returns:
Dict: 包含缓存统计信息的字典
"""
if not self.enable_cache:
return {"enabled": False, "cache_count": 0}
# 清理过期缓存
self._cleanup_expired_cache()
total_count = len(self.tool_cache)
ttl_distribution = {}
for cache_item in self.tool_cache.values():
ttl = cache_item["ttl"]
ttl_distribution[ttl] = ttl_distribution.get(ttl, 0) + 1
return {
"enabled": True,
"cache_count": total_count,
"cache_ttl": self.cache_ttl,
"ttl_distribution": ttl_distribution,
}
def set_cache_config(self, enable_cache: Optional[bool] = None, cache_ttl: int = -1):
"""动态修改缓存配置
Args:
enable_cache: 是否启用缓存
cache_ttl: 缓存TTL
"""
if enable_cache is not None:
self.enable_cache = enable_cache
logger.info(f"{self.log_prefix}缓存状态修改为: {'启用' if enable_cache else '禁用'}")
if cache_ttl > 0:
self.cache_ttl = cache_ttl
logger.info(f"{self.log_prefix}缓存TTL修改为: {cache_ttl}")
"""
ToolExecutor使用示例
# 1. 基础使用 - 从聊天消息执行工具启用缓存默认TTL=3
executor = ToolExecutor(executor_id="my_executor")
# 1. 基础使用 - 从聊天消息执行工具
executor = ToolExecutor(chat_id="my_executor")
results, _, _ = await executor.execute_from_chat_message(
talking_message_str="今天天气怎么样?现在几点了?",
is_group_chat=False
target_message="今天天气怎么样?现在几点了?",
chat_history="",
sender="用户"
)
# 2. 禁用缓存的执行器
no_cache_executor = ToolExecutor(executor_id="no_cache", enable_cache=False)
# 3. 自定义缓存TTL
long_cache_executor = ToolExecutor(executor_id="long_cache", cache_ttl=10)
# 4. 获取详细信息
# 2. 获取详细信息
results, used_tools, prompt = await executor.execute_from_chat_message(
talking_message_str="帮我查询Python相关知识",
is_group_chat=False,
target_message="帮我查询Python相关知识",
chat_history="",
sender="用户",
return_details=True
)
# 5. 直接执行特定工具
# 3. 直接执行特定工具
result = await executor.execute_specific_tool_simple(
tool_name="get_knowledge",
tool_args={"query": "机器学习"}
)
# 6. 缓存管理
cache_status = executor.get_cache_status() # 查看缓存状态
executor.clear_cache() # 清空缓存
executor.set_cache_config(cache_ttl=5) # 动态修改缓存配置
"""