From 441fc0b7422d3eb67300c1eb2ccbce177d444d43 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 6 Dec 2025 18:59:45 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=BC=98=E5=8C=96=E8=AE=B0?= =?UTF-8?q?=E5=BF=86=E6=9F=A5=E8=AF=A2=E7=9A=84=E8=B6=85=E6=97=B6=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/heart_flow/heartFC_chat.py | 207 ++------ src/config/official_configs.py | 5 + src/express/expression_learner.py | 71 ++- src/jargon/jargon_miner.py | 19 +- src/memory_system/memory_retrieval.py | 500 ++++++++++++------ src/memory_system/retrieval_tools/__init__.py | 4 +- .../retrieval_tools/found_answer.py | 36 +- template/bot_config_template.toml | 3 +- 8 files changed, 474 insertions(+), 371 deletions(-) diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index eb101741..4a54ae6f 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -303,90 +303,6 @@ class HeartFChatting: return loop_info, reply_text, cycle_timers - async def _run_planner_without_reply( - self, - available_actions: Dict[str, ActionInfo], - cycle_timers: Dict[str, float], - ) -> List[ActionPlannerInfo]: - """执行planner,但不包含reply动作(用于并行执行场景,提及时使用简化版提示词)""" - try: - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - is_mentioned=True, # 标记为提及时,使用简化版提示词 - ) - # 过滤掉reply动作(虽然提及时不应该有reply,但为了安全还是过滤一下) - return [action for action in action_to_use_info if action.action_type != "reply"] - except Exception as e: - logger.error(f"{self.log_prefix} Planner执行失败: {e}") - traceback.print_exc() - return [] - - async def _generate_mentioned_reply( - self, - force_reply_message: "DatabaseMessages", - thinking_id: str, - cycle_timers: Dict[str, float], - available_actions: Dict[str, ActionInfo], - ) -> Dict[str, Any]: - """当被提及时,独立生成回复的任务""" - try: - self.questioned = False - # 重置连续 no_reply 计数 - self.consecutive_no_reply_count = 0 - reason = "" - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={}, - action_name="reply", - action_reasoning=reason, - ) - - with Timer("提及回复生成", cycle_timers): - success, llm_response = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_message=force_reply_message, - available_actions=available_actions, - chosen_actions=[], # 独立回复,不依赖planner的动作 - reply_reason=reason, - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, - reply_time_point=self.last_read_time, - ) - - if not success or not llm_response or not llm_response.reply_set: - logger.warning(f"{self.log_prefix} 提及回复生成失败") - return {"action_type": "reply", "success": False, "result": "提及回复生成失败", "loop_info": None} - - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=force_reply_message, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=[], # 独立回复,不依赖planner的动作 - selected_expressions=selected_expressions, - ) - self.last_active_time = time.time() - return { - "action_type": "reply", - "success": True, - "result": f"你回复内容{reply_text}", - "loop_info": loop_info, - } - except Exception as e: - logger.error(f"{self.log_prefix} 提及回复生成异常: {e}") - traceback.print_exc() - return {"action_type": "reply", "success": False, "result": f"提及回复生成异常: {e}", "loop_info": None} - async def _observe( self, # interest_value: float = 0.0, recent_messages_list: Optional[List["DatabaseMessages"]] = None, @@ -438,95 +354,50 @@ class HeartFChatting: except Exception as e: logger.error(f"{self.log_prefix} 动作修改失败: {e}") - # 如果被提及,让回复生成和planner并行执行 - if force_reply_message: - logger.info(f"{self.log_prefix} 检测到提及,回复生成与planner并行执行") + # 执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - # 并行执行planner和回复生成 - planner_task = asyncio.create_task( - self._run_planner_without_reply( - available_actions=available_actions, - cycle_timers=cycle_timers, - ) + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + filter_intercept_message_level=1, + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + with Timer("规划器", cycle_timers): + action_to_use_info = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, ) - reply_task = asyncio.create_task( - self._generate_mentioned_reply( - force_reply_message=force_reply_message, - thinking_id=thinking_id, - cycle_timers=cycle_timers, - available_actions=available_actions, - ) - ) - - # 等待两个任务完成 - planner_result, reply_result = await asyncio.gather(planner_task, reply_task, return_exceptions=True) - - # 处理planner结果 - if isinstance(planner_result, BaseException): - logger.error(f"{self.log_prefix} Planner执行异常: {planner_result}") - action_to_use_info = [] - else: - action_to_use_info = planner_result - - # 处理回复结果 - if isinstance(reply_result, BaseException): - logger.error(f"{self.log_prefix} 回复生成异常: {reply_result}") - reply_result = { - "action_type": "reply", - "success": False, - "result": "回复生成异常", - "loop_info": None, - } - else: - # 正常流程:只执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - filter_intercept_message_level=1, - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - interest=global_config.personality.interest, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - ) - reply_result = None - - # 只在提及情况下过滤掉planner返回的reply动作(提及时已有独立回复生成) - if force_reply_message: - action_to_use_info = [action for action in action_to_use_info if action.action_type != "reply"] logger.info( f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}" ) - # 3. 并行执行所有动作(不包括reply,reply已经独立执行) + # 3. 并行执行所有动作 action_tasks = [ asyncio.create_task( self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) @@ -537,10 +408,6 @@ class HeartFChatting: # 并行执行所有任务 results = await asyncio.gather(*action_tasks, return_exceptions=True) - # 如果有独立的回复结果,添加到结果列表中 - if reply_result: - results = list(results) + [reply_result] - # 处理执行结果 reply_loop_info = None reply_text_from_reply = "" diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 3e93b56f..ca0fcd7b 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -260,6 +260,9 @@ class MemoryConfig(ConfigBase): max_agent_iterations: int = 5 """Agent最多迭代轮数(最低为1)""" + agent_timeout_seconds: float = 120.0 + """Agent超时时间(秒)""" + enable_jargon_detection: bool = True """记忆检索过程中是否启用黑话识别""" @@ -270,6 +273,8 @@ class MemoryConfig(ConfigBase): """验证配置值""" if self.max_agent_iterations < 1: raise ValueError(f"max_agent_iterations 必须至少为1,当前值: {self.max_agent_iterations}") + if self.agent_timeout_seconds <= 0: + raise ValueError(f"agent_timeout_seconds 必须大于0,当前值: {self.agent_timeout_seconds}") @dataclass diff --git a/src/express/expression_learner.py b/src/express/expression_learner.py index e39677a9..58b2cd6d 100644 --- a/src/express/expression_learner.py +++ b/src/express/expression_learner.py @@ -319,9 +319,74 @@ class ExpressionLearner: parsed = json.loads(repaired) else: parsed = repaired - except Exception: - logger.error(f"解析表达风格 JSON 失败,原始响应:{response}") - return [] + except Exception as parse_error: + # 如果解析失败,尝试修复中文引号问题 + # 使用状态机方法,在 JSON 字符串值内部将中文引号替换为转义的英文引号 + try: + def fix_chinese_quotes_in_json(text): + """使用状态机修复 JSON 字符串值中的中文引号""" + result = [] + i = 0 + in_string = False + escape_next = False + + while i < len(text): + char = text[i] + + if escape_next: + # 当前字符是转义字符后的字符,直接添加 + result.append(char) + escape_next = False + i += 1 + continue + + if char == '\\': + # 转义字符 + result.append(char) + escape_next = True + i += 1 + continue + + if char == '"' and not escape_next: + # 遇到英文引号,切换字符串状态 + in_string = not in_string + result.append(char) + i += 1 + continue + + if in_string: + # 在字符串值内部,将中文引号替换为转义的英文引号 + if char == '"': # 中文左引号 + result.append('\\"') + elif char == '"': # 中文右引号 + result.append('\\"') + else: + result.append(char) + else: + # 不在字符串内,直接添加 + result.append(char) + + i += 1 + + return ''.join(result) + + fixed_raw = fix_chinese_quotes_in_json(raw) + + # 再次尝试解析 + if fixed_raw.startswith("[") and fixed_raw.endswith("]"): + parsed = json.loads(fixed_raw) + else: + repaired = repair_json(fixed_raw) + if isinstance(repaired, str): + parsed = json.loads(repaired) + else: + parsed = repaired + except Exception as fix_error: + logger.error(f"解析表达风格 JSON 失败,初始错误: {type(parse_error).__name__}: {str(parse_error)}") + logger.error(f"修复中文引号后仍失败,错误: {type(fix_error).__name__}: {str(fix_error)}") + logger.error(f"解析表达风格 JSON 失败,原始响应:{response}") + logger.error(f"处理后的 JSON 字符串(前500字符):{raw[:500]}") + return [] if isinstance(parsed, dict): parsed_list = [parsed] diff --git a/src/jargon/jargon_miner.py b/src/jargon/jargon_miner.py index b7f6c857..7cda4e02 100644 --- a/src/jargon/jargon_miner.py +++ b/src/jargon/jargon_miner.py @@ -48,7 +48,7 @@ def _init_prompt() -> None: - 中文词语的缩写,用几个汉字概括一个词汇或含义,例如:社死、内卷 以 JSON 数组输出,元素为对象(严格按以下结构): -请你提取出可能的黑话,最多10 +请你提取出可能的黑话,最多30个黑话,请尽量提取所有 [ {{"content": "词条", "msg_id": "m12"}}, // msg_id 必须与上方聊天中展示的ID完全一致 {{"content": "词条2", "msg_id": "m15"}} @@ -168,19 +168,24 @@ class JargonMiner: self.chat_id = chat_id self.last_learning_time: float = time.time() # 频率控制,可按需调整 - self.min_messages_for_learning: int = 10 - self.min_learning_interval: float = 20 + self.min_messages_for_learning: int = 30 + self.min_learning_interval: float = 60 self.llm = LLMRequest( model_set=model_config.model_task_config.utils, request_type="jargon.extract", ) + + self.llm_inference = LLMRequest( + model_set=model_config.model_task_config.utils, + request_type="jargon.inference", + ) # 初始化stream_name作为类属性,避免重复提取 chat_manager = get_chat_manager() stream_name = chat_manager.get_stream_name(self.chat_id) self.stream_name = stream_name if stream_name else self.chat_id - self.cache_limit = 100 + self.cache_limit = 50 self.cache: OrderedDict[str, None] = OrderedDict() # 黑话提取锁,防止并发执行 @@ -276,7 +281,7 @@ class JargonMiner: raw_content_list=raw_content_text, ) - response1, _ = await self.llm.generate_response_async(prompt1, temperature=0.3) + response1, _ = await self.llm_inference.generate_response_async(prompt1, temperature=0.3) if not response1: logger.warning(f"jargon {content} 推断1失败:无响应") return @@ -313,7 +318,7 @@ class JargonMiner: content=content, ) - response2, _ = await self.llm.generate_response_async(prompt2, temperature=0.3) + response2, _ = await self.llm_inference.generate_response_async(prompt2, temperature=0.3) if not response2: logger.warning(f"jargon {content} 推断2失败:无响应") return @@ -360,7 +365,7 @@ class JargonMiner: if global_config.debug.show_jargon_prompt: logger.info(f"jargon {content} 比较提示词: {prompt3}") - response3, _ = await self.llm.generate_response_async(prompt3, temperature=0.3) + response3, _ = await self.llm_inference.generate_response_async(prompt3, temperature=0.3) if not response3: logger.warning(f"jargon {content} 比较失败:无响应") return diff --git a/src/memory_system/memory_retrieval.py b/src/memory_system/memory_retrieval.py index 83c9ed47..aa20ce0f 100644 --- a/src/memory_system/memory_retrieval.py +++ b/src/memory_system/memory_retrieval.py @@ -1,6 +1,7 @@ import time import json import asyncio +import re from typing import List, Dict, Any, Optional, Tuple, Set from src.common.logger import get_logger from src.config.config import global_config, model_config @@ -77,20 +78,12 @@ def init_memory_retrieval_prompt(): 问题要说明前因后果和上下文,使其全面且精准 -输出格式示例(需要检索时): +输出格式示例: ```json {{ "questions": ["张三在前几天干了什么"] #问题数组(字符串数组),如果不需要检索记忆则输出空数组[],如果需要检索则只输出包含一个问题的数组 }} ``` - -输出格式示例(不需要检索时): -```json -{{ - "questions": [] -}} -``` - 请只输出JSON对象,不要输出其他内容: """, name="memory_retrieval_question_prompt", @@ -104,17 +97,16 @@ def init_memory_retrieval_prompt(): 已收集的信息: {collected_info} -**执行步骤:** +**工具说明:** - 如果涉及过往事件,或者查询某个过去可能提到过的概念,或者某段时间发生的事件。可以使用聊天记录查询工具查询过往事件 - 如果涉及人物,可以使用人物信息查询工具查询人物信息 - 如果没有可靠信息,且查询时间充足,或者不确定查询类别,也可以使用lpmm知识库查询,作为辅助信息 -- **如果信息不足需要使用tool,说明需要查询什么,并输出为纯文本说明,然后调用相应工具查询(可并行调用多个工具)** -- **如果当前已收集的信息足够回答问题,且能找到明确答案,调用found_answer工具标记已找到答案** **思考** - 你可以对查询思路给出简短的思考:思考要简短,直接切入要点 -- 如果信息不足,你必须给出使用什么工具进行查询 -- 如果信息足够,你必须调用found_answer工具 +- 先思考当前信息是否足够回答问题 +- 如果信息不足,则需要使用tool查询信息,你必须给出使用什么工具进行查询 +- 如果当前已收集的信息足够或信息不足确定无法找到答案,你必须调用finish_search工具结束查询 """, name="memory_retrieval_react_prompt_head", ) @@ -128,14 +120,12 @@ def init_memory_retrieval_prompt(): 已收集的信息: {collected_info} -**执行步骤:** 分析: - 当前信息是否足够回答问题? - **如果信息足够且能找到明确答案**,在思考中直接给出答案,格式为:found_answer(answer="你的答案内容") - **如果信息不足或无法找到答案**,在思考中给出:not_enough_info(reason="信息不足或无法找到答案的原因") **重要规则:** -- 你已经经过几轮查询,尝试了信息搜集,现在你需要总结信息,选择回答问题或判断问题无法回答 - 必须严格使用检索到的信息回答问题,不要编造信息 - 答案必须精简,不要过多解释 - **只有在检索到明确、具体的答案时,才使用found_answer** @@ -167,7 +157,7 @@ def _log_conversation_messages( # 如果有head_prompt,先添加为第一条消息 if head_prompt: - msg_info = "========================================\n[消息 1] 角色: System 内容类型: 文本\n-----------------------------" + msg_info = "========================================\n[消息 1] 角色: System\n-----------------------------" msg_info += f"\n{head_prompt}" log_lines.append(msg_info) start_idx = 2 @@ -180,19 +170,6 @@ def _log_conversation_messages( for idx, msg in enumerate(conversation_messages, start_idx): role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role) - # # 处理内容 - 显示完整内容,不截断 - # if isinstance(msg.content, str): - # full_content = msg.content - # content_type = "文本" - # elif isinstance(msg.content, list): - # text_parts = [item for item in msg.content if isinstance(item, str)] - # image_count = len([item for item in msg.content if isinstance(item, tuple)]) - # full_content = "".join(text_parts) if text_parts else "" - # content_type = f"混合({len(text_parts)}段文本, {image_count}张图片)" - # else: - # full_content = str(msg.content) - # content_type = "未知" - # 构建单条消息的日志信息 # msg_info = f"\n========================================\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n-----------------------------" msg_info = f"\n========================================\n[消息 {idx}] 角色: {role_name}\n-----------------------------" @@ -205,14 +182,12 @@ def _log_conversation_messages( if msg.tool_calls: msg_info += f"\n 工具调用: {len(msg.tool_calls)}个" for tool_call in msg.tool_calls: - msg_info += f"\n - {tool_call}" msg_info += f"\n - {tool_call.func_name}: {json.dumps(tool_call.args, ensure_ascii=False)}" - if msg.tool_call_id: - msg_info += f"\n 工具调用ID: {msg.tool_call_id}" # if msg.tool_call_id: # msg_info += f"\n 工具调用ID: {msg.tool_call_id}" + log_lines.append(msg_info) total_count = len(conversation_messages) + (1 if head_prompt else 0) @@ -257,6 +232,7 @@ async def _react_agent_solve_question( conversation_messages: List[Message] = [] first_head_prompt: Optional[str] = None # 保存第一次使用的head_prompt(用于日志显示) + # 正常迭代:max_iterations 次(最终评估单独处理,不算在迭代中) for iteration in range(max_iterations): # 检查超时 if time.time() - start_time > timeout: @@ -276,7 +252,6 @@ async def _react_agent_solve_question( # 计算剩余迭代次数 current_iteration = iteration + 1 remaining_iterations = max_iterations - current_iteration - is_final_iteration = current_iteration >= max_iterations # 提取函数调用中参数的值,支持单引号和双引号 def extract_quoted_content(text, func_name, param_name): @@ -336,114 +311,10 @@ async def _react_agent_solve_question( return None - # 如果是最后一次迭代,使用final_prompt进行总结 - if is_final_iteration: - evaluation_prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_react_final_prompt", - bot_name=bot_name, - time_now=time_now, - question=question, - collected_info=collected_info if collected_info else "暂无信息", - current_iteration=current_iteration, - remaining_iterations=remaining_iterations, - max_iterations=max_iterations, - ) - - if global_config.debug.show_memory_prompt: - logger.info(f"ReAct Agent 最终评估Prompt: {evaluation_prompt}") - - eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools( - evaluation_prompt, - model_config=model_config.model_task_config.tool_use, - tool_options=[], # 最终评估阶段不提供工具 - request_type="memory.react.final", - ) - - if not eval_success: - logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段 LLM调用失败: {eval_response}") - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status="未找到答案:最终评估阶段LLM调用失败", - ) - return False, "最终评估阶段LLM调用失败", thinking_steps, False - - logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估响应: {eval_response}" - ) - - # 从最终评估响应中提取found_answer或not_enough_info - found_answer_content = None - not_enough_info_reason = None - - if eval_response: - found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer") - if not found_answer_content: - not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason") - - # 如果找到答案,返回 - if found_answer_content: - eval_step = { - "iteration": iteration + 1, - "thought": f"[最终评估] {eval_response}", - "actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}], - "observations": ["最终评估阶段检测到found_answer"] - } - thinking_steps.append(eval_step) - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段找到关于问题{question}的答案: {found_answer_content}") - - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status=f"找到答案:{found_answer_content}", - ) - - return True, found_answer_content, thinking_steps, False - - # 如果评估为not_enough_info,返回空字符串(不返回任何信息) - if not_enough_info_reason: - eval_step = { - "iteration": iteration + 1, - "thought": f"[最终评估] {eval_response}", - "actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}], - "observations": ["最终评估阶段检测到not_enough_info"] - } - thinking_steps.append(eval_step) - logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段判断信息不足: {not_enough_info_reason}" - ) - - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status=f"未找到答案:{not_enough_info_reason}", - ) - - return False, "", thinking_steps, False - - # 如果没有明确判断,视为not_enough_info,返回空字符串(不返回任何信息) - eval_step = { - "iteration": iteration + 1, - "thought": f"[最终评估] {eval_response}", - "actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}], - "observations": ["已到达最后一次迭代,无法找到答案"] - } - thinking_steps.append(eval_step) - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案") - - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status="未找到答案:已到达最后一次迭代,无法找到答案", - ) - - return False, "", thinking_steps, False - - # 前n-1次迭代,使用head_prompt决定调用哪些工具(包含found_answer工具) + # 正常迭代:使用head_prompt决定调用哪些工具(包含finish_search工具) tool_definitions = tool_registry.get_tool_definitions() - logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}" - ) + # tool_names = [tool_def["name"] for tool_def in tool_definitions] + # logger.debug(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具: {', '.join(tool_names)} (共{len(tool_definitions)}个)") # head_prompt应该只构建一次,使用初始的collected_info,后续迭代都复用同一个 if first_head_prompt is None: @@ -493,15 +364,15 @@ async def _react_agent_solve_question( request_type="memory.react", ) - logger.debug( - f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}" - ) + # logger.info( + # f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}" + # ) if not success: logger.error(f"ReAct Agent LLM调用失败: {response}") break - # 注意:这里会检查found_answer工具调用,如果检测到found_answer工具,会直接返回答案 + # 注意:这里会检查finish_search工具调用,如果检测到finish_search工具,会根据found_answer参数决定返回答案或退出查询 assistant_message: Optional[Message] = None if tool_calls: @@ -531,8 +402,102 @@ async def _react_agent_solve_question( # 处理工具调用 if not tool_calls: - # 如果没有工具调用,记录思考过程,继续下一轮迭代(下一轮会再次评估) + # 如果没有工具调用,检查响应文本中是否包含finish_search函数调用格式 if response and response.strip(): + # 尝试从文本中解析finish_search函数调用 + def parse_finish_search_from_text(text: str): + """从文本中解析finish_search函数调用,返回(found_answer, answer)元组,如果未找到则返回(None, None)""" + if not text: + return None, None + + # 查找finish_search函数调用位置(不区分大小写) + func_pattern = "finish_search" + text_lower = text.lower() + func_pos = text_lower.find(func_pattern) + if func_pos == -1: + return None, None + + # 查找函数调用的开始和结束位置 + # 从func_pos开始向后查找左括号 + start_pos = text.find("(", func_pos) + if start_pos == -1: + return None, None + + # 查找匹配的右括号(考虑嵌套) + paren_count = 0 + end_pos = start_pos + for i in range(start_pos, len(text)): + if text[i] == "(": + paren_count += 1 + elif text[i] == ")": + paren_count -= 1 + if paren_count == 0: + end_pos = i + break + else: + # 没有找到匹配的右括号 + return None, None + + # 提取函数参数部分 + params_text = text[start_pos + 1 : end_pos] + + # 解析found_answer参数(布尔值,可能是true/false/True/False) + found_answer = None + found_answer_patterns = [ + r"found_answer\s*=\s*true", + r"found_answer\s*=\s*True", + r"found_answer\s*=\s*false", + r"found_answer\s*=\s*False", + ] + for pattern in found_answer_patterns: + match = re.search(pattern, params_text, re.IGNORECASE) + if match: + found_answer = "true" in match.group(0).lower() + break + + # 解析answer参数(字符串,使用extract_quoted_content) + answer = extract_quoted_content(text, "finish_search", "answer") + + return found_answer, answer + + parsed_found_answer, parsed_answer = parse_finish_search_from_text(response) + + if parsed_found_answer is not None: + # 检测到finish_search函数调用格式 + if parsed_found_answer: + # 找到了答案 + if parsed_answer: + step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": True, "answer": parsed_answer}}) + step["observations"] = ["检测到finish_search文本格式调用,找到答案"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search文本格式找到关于问题{question}的答案: {parsed_answer}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"找到答案:{parsed_answer}", + ) + + return True, parsed_answer, thinking_steps, False + else: + # found_answer为True但没有提供answer,视为错误,继续迭代 + logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 finish_search文本格式found_answer为True但未提供answer") + else: + # 未找到答案,直接退出查询 + step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": False}}) + step["observations"] = ["检测到finish_search文本格式调用,未找到答案"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search文本格式判断未找到答案") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="未找到答案:通过finish_search文本格式判断未找到答案", + ) + + return False, "", thinking_steps, False + + # 如果没有检测到finish_search格式,记录思考过程,继续下一轮迭代 step["observations"] = [f"思考完成,但未调用工具。响应: {response}"] logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response}") collected_info += f"思考: {response}" @@ -543,29 +508,51 @@ async def _react_agent_solve_question( continue # 处理工具调用 - # 首先检查是否有found_answer工具调用,如果有则立即返回,不再处理其他工具 - found_answer_from_tool = None + # 首先检查是否有finish_search工具调用,如果有则立即返回,不再处理其他工具 + finish_search_found = None + finish_search_answer = None for tool_call in tool_calls: tool_name = tool_call.func_name tool_args = tool_call.args or {} - if tool_name == "found_answer": - found_answer_from_tool = tool_args.get("answer", "") - if found_answer_from_tool: - step["actions"].append({"action_type": "found_answer", "action_params": {"answer": found_answer_from_tool}}) - step["observations"] = ["检测到found_answer工具调用"] + if tool_name == "finish_search": + finish_search_found = tool_args.get("found_answer", False) + finish_search_answer = tool_args.get("answer", "") + + if finish_search_found: + # 找到了答案 + if finish_search_answer: + step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": True, "answer": finish_search_answer}}) + step["observations"] = ["检测到finish_search工具调用,找到答案"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search工具找到关于问题{question}的答案: {finish_search_answer}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"找到答案:{finish_search_answer}", + ) + + return True, finish_search_answer, thinking_steps, False + else: + # found_answer为True但没有提供answer,视为错误 + logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 finish_search工具found_answer为True但未提供answer") + else: + # 未找到答案,直接退出查询 + step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": False}}) + step["observations"] = ["检测到finish_search工具调用,未找到答案"] thinking_steps.append(step) - logger.debug(f"ReAct Agent 第 {iteration + 1} 次迭代 通过found_answer工具找到关于问题{question}的答案: {found_answer_from_tool}") + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search工具判断未找到答案") _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status=f"找到答案:{found_answer_from_tool}", + final_status="未找到答案:通过finish_search工具判断未找到答案", ) - return True, found_answer_from_tool, thinking_steps, False + return False, "", thinking_steps, False - # 如果没有found_answer工具调用,或者found_answer工具调用没有答案,继续处理其他工具 + # 如果没有finish_search工具调用,继续处理其他工具 tool_tasks = [] for i, tool_call in enumerate(tool_calls): tool_name = tool_call.func_name @@ -575,8 +562,8 @@ async def _react_agent_solve_question( f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})" ) - # 跳过found_answer工具调用(已经在上面处理过了) - if tool_name == "found_answer": + # 跳过finish_search工具调用(已经在上面处理过了) + if tool_name == "finish_search": continue # 普通工具调用 @@ -649,24 +636,186 @@ async def _react_agent_solve_question( thinking_steps.append(step) - # 达到最大迭代次数或超时,但Agent没有明确返回found_answer - # 迭代超时应该直接视为not_enough_info,而不是使用已有信息 - # 只有Agent明确返回found_answer时,才认为找到了答案 - if collected_info: - logger.warning( - f"ReAct Agent达到最大迭代次数或超时,但未明确返回found_answer。已收集信息: {collected_info[:100]}..." - ) + # 正常迭代结束后,如果达到最大迭代次数或超时,执行最终评估 + # 最终评估单独处理,不算在迭代中 + should_do_final_evaluation = False if is_timeout: - logger.warning("ReAct Agent超时,直接视为not_enough_info") - else: - logger.warning("ReAct Agent达到最大迭代次数,直接视为not_enough_info") + should_do_final_evaluation = True + logger.warning(f"ReAct Agent超时,已迭代{iteration + 1}次,进入最终评估") + elif iteration + 1 >= max_iterations: + should_do_final_evaluation = True + logger.info(f"ReAct Agent达到最大迭代次数(已迭代{iteration + 1}次),进入最终评估") - # React完成时输出消息列表 - timeout_reason = "超时" if is_timeout else "达到最大迭代次数" + if should_do_final_evaluation: + # 获取必要变量用于最终评估 + tool_registry = get_tool_registry() + bot_name = global_config.bot.nickname + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + current_iteration = iteration + 1 + remaining_iterations = 0 + + # 提取函数调用中参数的值,支持单引号和双引号 + def extract_quoted_content(text, func_name, param_name): + """从文本中提取函数调用中参数的值,支持单引号和双引号 + + Args: + text: 要搜索的文本 + func_name: 函数名,如 'found_answer' + param_name: 参数名,如 'answer' + + Returns: + 提取的参数值,如果未找到则返回None + """ + if not text: + return None + + # 查找函数调用位置(不区分大小写) + func_pattern = func_name.lower() + text_lower = text.lower() + func_pos = text_lower.find(func_pattern) + if func_pos == -1: + return None + + # 查找参数名和等号 + param_pattern = f"{param_name}=" + param_pos = text_lower.find(param_pattern, func_pos) + if param_pos == -1: + return None + + # 跳过参数名、等号和空白 + start_pos = param_pos + len(param_pattern) + while start_pos < len(text) and text[start_pos] in " \t\n": + start_pos += 1 + + if start_pos >= len(text): + return None + + # 确定引号类型 + quote_char = text[start_pos] + if quote_char not in ['"', "'"]: + return None + + # 查找匹配的结束引号(考虑转义) + end_pos = start_pos + 1 + while end_pos < len(text): + if text[end_pos] == quote_char: + # 检查是否是转义的引号 + if end_pos > start_pos + 1 and text[end_pos - 1] == "\\": + end_pos += 1 + continue + # 找到匹配的引号 + content = text[start_pos + 1 : end_pos] + # 处理转义字符 + content = content.replace('\\"', '"').replace("\\'", "'").replace("\\\\", "\\") + return content + end_pos += 1 + + return None + + # 执行最终评估 + evaluation_prompt = await global_prompt_manager.format_prompt( + "memory_retrieval_react_final_prompt", + bot_name=bot_name, + time_now=time_now, + question=question, + collected_info=collected_info if collected_info else "暂无信息", + current_iteration=current_iteration, + remaining_iterations=remaining_iterations, + max_iterations=max_iterations, + ) + + eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools( + evaluation_prompt, + model_config=model_config.model_task_config.tool_use, + tool_options=[], # 最终评估阶段不提供工具 + request_type="memory.react.final", + ) + + if not eval_success: + logger.error(f"ReAct Agent 最终评估阶段 LLM调用失败: {eval_response}") + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="未找到答案:最终评估阶段LLM调用失败", + ) + return False, "最终评估阶段LLM调用失败", thinking_steps, is_timeout + + if global_config.debug.show_memory_prompt: + logger.info(f"ReAct Agent 最终评估Prompt: {evaluation_prompt}") + logger.info(f"ReAct Agent 最终评估响应: {eval_response}") + + # 从最终评估响应中提取found_answer或not_enough_info + found_answer_content = None + not_enough_info_reason = None + + if eval_response: + found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer") + if not found_answer_content: + not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason") + + # 如果找到答案,返回(找到答案时,无论是否超时,都视为成功完成) + if found_answer_content: + eval_step = { + "iteration": current_iteration, + "thought": f"[最终评估] {eval_response}", + "actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}], + "observations": ["最终评估阶段检测到found_answer"] + } + thinking_steps.append(eval_step) + logger.info(f"ReAct Agent 最终评估阶段找到关于问题{question}的答案: {found_answer_content}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"找到答案:{found_answer_content}", + ) + + return True, found_answer_content, thinking_steps, False + + # 如果评估为not_enough_info,返回空字符串(不返回任何信息) + if not_enough_info_reason: + eval_step = { + "iteration": current_iteration, + "thought": f"[最终评估] {eval_response}", + "actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}], + "observations": ["最终评估阶段检测到not_enough_info"] + } + thinking_steps.append(eval_step) + logger.info(f"ReAct Agent 最终评估阶段判断信息不足: {not_enough_info_reason}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"未找到答案:{not_enough_info_reason}", + ) + + return False, "", thinking_steps, is_timeout + + # 如果没有明确判断,视为not_enough_info,返回空字符串(不返回任何信息) + eval_step = { + "iteration": current_iteration, + "thought": f"[最终评估] {eval_response}", + "actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最大迭代次数,无法找到答案"}}], + "observations": ["已到达最大迭代次数,无法找到答案"] + } + thinking_steps.append(eval_step) + logger.info("ReAct Agent 已到达最大迭代次数,无法找到答案") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="未找到答案:已到达最大迭代次数,无法找到答案", + ) + + return False, "", thinking_steps, is_timeout + + # 如果正常迭代过程中提前找到答案返回,不会到达这里 + # 如果正常迭代结束但没有触发最终评估(理论上不应该发生),直接返回 + logger.warning("ReAct Agent正常迭代结束,但未触发最终评估") _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status=f"未找到答案:{timeout_reason}", + final_status="未找到答案:正常迭代结束", ) return False, "", thinking_steps, is_timeout @@ -851,7 +1000,7 @@ async def _process_single_question( question=question, chat_id=chat_id, max_iterations=global_config.memory.max_agent_iterations, - timeout=120.0, + timeout=global_config.memory.agent_timeout_seconds, initial_info=question_initial_info, initial_jargon_concepts=jargon_concepts_for_agent, ) @@ -967,9 +1116,10 @@ async def build_memory_retrieval_prompt( logger.info(f"无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}秒") return "" - # 第二步:并行处理所有问题(使用配置的最大迭代次数/120秒超时) + # 第二步:并行处理所有问题(使用配置的最大迭代次数和超时时间) max_iterations = global_config.memory.max_agent_iterations - logger.debug(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: 120秒") + timeout_seconds = global_config.memory.agent_timeout_seconds + logger.debug(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: {timeout_seconds}秒") # 并行处理所有问题,将概念检索结果作为初始信息传递 question_tasks = [ diff --git a/src/memory_system/retrieval_tools/__init__.py b/src/memory_system/retrieval_tools/__init__.py index 7832985f..f30fd779 100644 --- a/src/memory_system/retrieval_tools/__init__.py +++ b/src/memory_system/retrieval_tools/__init__.py @@ -14,7 +14,7 @@ from .tool_registry import ( from .query_chat_history import register_tool as register_query_chat_history from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge from .query_person_info import register_tool as register_query_person_info -from .found_answer import register_tool as register_found_answer +from .found_answer import register_tool as register_finish_search from src.config.config import global_config @@ -22,7 +22,7 @@ def init_all_tools(): """初始化并注册所有记忆检索工具""" register_query_chat_history() register_query_person_info() - register_found_answer() # 注册found_answer工具 + register_finish_search() # 注册finish_search工具 if global_config.lpmm_knowledge.lpmm_mode == "agent": register_lpmm_knowledge() diff --git a/src/memory_system/retrieval_tools/found_answer.py b/src/memory_system/retrieval_tools/found_answer.py index 0efd02be..148424b9 100644 --- a/src/memory_system/retrieval_tools/found_answer.py +++ b/src/memory_system/retrieval_tools/found_answer.py @@ -1,5 +1,5 @@ """ -found_answer工具 - 用于在记忆检索过程中标记找到答案 +finish_search工具 - 用于在记忆检索过程中结束查询 """ from src.common.logger import get_logger @@ -8,33 +8,43 @@ from .tool_registry import register_memory_retrieval_tool logger = get_logger("memory_retrieval_tools") -async def found_answer(answer: str) -> str: - """标记已找到问题的答案 +async def finish_search(found_answer: bool, answer: str = "") -> str: + """结束查询 Args: - answer: 找到的答案内容 + found_answer: 是否找到了答案 + answer: 如果找到了答案,提供答案内容;如果未找到,可以为空 Returns: str: 确认信息 """ - # 这个工具主要用于标记,实际答案会通过返回值传递 - logger.info(f"找到答案: {answer}") - return f"已确认找到答案: {answer}" + if found_answer: + logger.info(f"找到答案: {answer}") + return f"已确认找到答案: {answer}" + else: + logger.info("未找到答案,结束查询") + return "未找到答案,查询结束" def register_tool(): - """注册found_answer工具""" + """注册finish_search工具""" register_memory_retrieval_tool( - name="found_answer", - description="当你在已收集的信息中找到了问题的明确答案时,调用此工具标记已找到答案。只有在检索到明确、具体的答案时才使用此工具,不要编造信息。", + name="finish_search", + description="当你决定结束查询时,调用此工具。如果找到了明确答案,设置found_answer为true并在answer中提供答案;如果未找到答案,设置found_answer为false。只有在检索到明确、具体的答案时才设置found_answer为true,不要编造信息。", parameters=[ + { + "name": "found_answer", + "type": "boolean", + "description": "是否找到了答案", + "required": True, + }, { "name": "answer", "type": "string", - "description": "找到的答案内容,必须基于已收集的信息,不要编造", - "required": True, + "description": "如果found_answer为true,提供找到的答案内容,必须基于已收集的信息,不要编造;如果found_answer为false,可以为空", + "required": False, }, ], - execute_func=found_answer, + execute_func=finish_search, ) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index c901a4ec..8ff9ca0b 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -109,7 +109,8 @@ talk_value_rules = [ include_planner_reasoning = false # 是否将planner推理加入replyer,默认关闭(不加入) [memory] -max_agent_iterations = 3 # 记忆思考深度(最低为1(不深入思考)) +max_agent_iterations = 2 # 记忆思考深度(最低为1) +agent_timeout_seconds = 45.0 # 最长回忆时间(秒) enable_jargon_detection = true # 记忆检索过程中是否启用黑话识别 global_memory = false # 是否允许记忆检索进行全局查询