diff --git a/src/memory_system/memory_retrieval.py b/src/memory_system/memory_retrieval.py index dc7755ea..476f316e 100644 --- a/src/memory_system/memory_retrieval.py +++ b/src/memory_system/memory_retrieval.py @@ -104,6 +104,7 @@ def init_memory_retrieval_prompt(): **重要限制:** - 思考要简短,直接切入要点 +- 最大查询轮数:{max_iterations}轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) 当前需要解答的问题:{question} 已收集的信息: @@ -113,11 +114,13 @@ def init_memory_retrieval_prompt(): - 如果涉及过往事件,或者查询某个过去可能提到过的概念,或者某段时间发生的事件。可以使用聊天记录查询工具查询过往事件 - 如果涉及人物,可以使用人物信息查询工具查询人物信息 - 如果没有可靠信息,且查询时间充足,或者不确定查询类别,也可以使用lpmm知识库查询,作为辅助信息 -- 如果信息不足需要使用tool,说明需要查询什么,并输出为纯文本说明,然后调用相应工具查询(可并行调用多个工具) +- **如果信息不足需要使用tool,说明需要查询什么,并输出为纯文本说明,然后调用相应工具查询(可并行调用多个工具)** +- **如果当前已收集的信息足够回答问题,且能找到明确答案,调用found_answer工具标记已找到答案** **思考** - 你可以对查询思路给出简短的思考 -- 你必须给出使用什么工具进行查询 +- 如果信息不足,你必须给出使用什么工具进行查询 +- 如果信息足够,你必须调用found_answer工具 """, name="memory_retrieval_react_prompt_head", ) @@ -314,33 +317,38 @@ def _match_jargon_from_text(chat_text: str, chat_id: str) -> List[str]: return list(matched.keys()) -def _log_conversation_messages(conversation_messages: List[Message], head_prompt: Optional[str] = None) -> None: +def _log_conversation_messages( + conversation_messages: List[Message], + head_prompt: Optional[str] = None, + final_status: Optional[str] = None, +) -> None: """输出对话消息列表的日志 - + Args: conversation_messages: 对话消息列表 head_prompt: 第一条系统消息(head_prompt)的内容,可选 + final_status: 最终结果状态描述(例如:找到答案/未找到答案),可选 """ if not global_config.debug.show_memory_prompt: return - - log_lines = [] - + + log_lines: List[str] = [] + # 如果有head_prompt,先添加为第一条消息 if head_prompt: - msg_info = "\n[消息 1] 角色: System 内容类型: 文本\n========================================" + msg_info = "========================================\n[消息 1] 角色: System 内容类型: 文本\n-----------------------" msg_info += f"\n{head_prompt}" log_lines.append(msg_info) start_idx = 2 else: start_idx = 1 - + if not conversation_messages and not head_prompt: return - + for idx, msg in enumerate(conversation_messages, start_idx): role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role) - + # 处理内容 - 显示完整内容,不截断 if isinstance(msg.content, str): full_content = msg.content @@ -353,25 +361,28 @@ def _log_conversation_messages(conversation_messages: List[Message], head_prompt else: full_content = str(msg.content) content_type = "未知" - + # 构建单条消息的日志信息 msg_info = f"\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n========================================" - + if full_content: msg_info += f"\n{full_content}" - + if msg.tool_calls: msg_info += f"\n 工具调用: {len(msg.tool_calls)}个" for tool_call in msg.tool_calls: msg_info += f"\n - {tool_call}" - + if msg.tool_call_id: msg_info += f"\n 工具调用ID: {msg.tool_call_id}" - + log_lines.append(msg_info) - + total_count = len(conversation_messages) + (1 if head_prompt else 0) - logger.info(f"消息列表 (共{total_count}条):{''.join(log_lines)}") + log_text = f"消息列表 (共{total_count}条):{''.join(log_lines)}" + if final_status: + log_text += f"\n\n[最终结果] {final_status}" + logger.info(log_text) async def _react_agent_solve_question( @@ -407,7 +418,7 @@ async def _react_agent_solve_question( thinking_steps = [] is_timeout = False conversation_messages: List[Message] = [] - last_head_prompt: Optional[str] = None # 保存最后一次使用的head_prompt + first_head_prompt: Optional[str] = None # 保存第一次使用的head_prompt(用于日志显示) for iteration in range(max_iterations): # 检查超时 @@ -430,40 +441,6 @@ async def _react_agent_solve_question( remaining_iterations = max_iterations - current_iteration is_final_iteration = current_iteration >= max_iterations - # 每次迭代开始时,先评估当前信息是否足够回答问题 - evaluation_prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_react_final_prompt", - bot_name=bot_name, - time_now=time_now, - question=question, - collected_info=collected_info if collected_info else "暂无信息", - current_iteration=current_iteration, - remaining_iterations=remaining_iterations, - max_iterations=max_iterations, - ) - - # if global_config.debug.show_memory_prompt: - # logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估Prompt: {evaluation_prompt}") - - eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools( - evaluation_prompt, - model_config=model_config.model_task_config.tool_use, - tool_options=[], # 评估阶段不提供工具 - request_type="memory.react.eval", - ) - - if not eval_success: - logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段 LLM调用失败: {eval_response}") - # 评估失败,如果还有剩余迭代次数,尝试继续查询 - if not is_final_iteration: - continue - else: - break - - logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代 评估响应: {eval_response}" - ) - # 提取函数调用中参数的值,支持单引号和双引号 def extract_quoted_content(text, func_name, param_name): """从文本中提取函数调用中参数的值,支持单引号和双引号 @@ -522,88 +499,132 @@ async def _react_agent_solve_question( return None - # 从评估响应中提取found_answer或not_enough_info - found_answer_content = None - not_enough_info_reason = None + # 如果是最后一次迭代,使用final_prompt进行总结 + if is_final_iteration: + evaluation_prompt = await global_prompt_manager.format_prompt( + "memory_retrieval_react_final_prompt", + bot_name=bot_name, + time_now=time_now, + question=question, + collected_info=collected_info if collected_info else "暂无信息", + current_iteration=current_iteration, + remaining_iterations=remaining_iterations, + max_iterations=max_iterations, + ) - if eval_response: - found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer") - if not found_answer_content: - not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason") + if global_config.debug.show_memory_prompt: + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估Prompt: {evaluation_prompt}") - # 如果找到答案,直接返回 - if found_answer_content: - eval_step = { - "iteration": iteration + 1, - "thought": f"[评估] {eval_response}", - "actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}], - "observations": ["评估阶段检测到found_answer"] - } - thinking_steps.append(eval_step) - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段找到关于问题{question}的答案: {found_answer_content}") - - # React完成时输出消息列表 - _log_conversation_messages(conversation_messages, last_head_prompt) - - return True, found_answer_content, thinking_steps, False + eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools( + evaluation_prompt, + model_config=model_config.model_task_config.tool_use, + tool_options=[], # 最终评估阶段不提供工具 + request_type="memory.react.final", + ) - # 如果评估为not_enough_info,且是最终迭代,返回not_enough_info - if not_enough_info_reason: - if is_final_iteration: + if not eval_success: + logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段 LLM调用失败: {eval_response}") + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="未找到答案:最终评估阶段LLM调用失败", + ) + return False, "最终评估阶段LLM调用失败", thinking_steps, False + + logger.info( + f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估响应: {eval_response}" + ) + + # 从最终评估响应中提取found_answer或not_enough_info + found_answer_content = None + not_enough_info_reason = None + + if eval_response: + found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer") + if not found_answer_content: + not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason") + + # 如果找到答案,返回 + if found_answer_content: eval_step = { "iteration": iteration + 1, - "thought": f"[评估] {eval_response}", + "thought": f"[最终评估] {eval_response}", + "actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}], + "observations": ["最终评估阶段检测到found_answer"] + } + thinking_steps.append(eval_step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段找到关于问题{question}的答案: {found_answer_content}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"找到答案:{found_answer_content}", + ) + + return True, found_answer_content, thinking_steps, False + + # 如果评估为not_enough_info,返回空字符串(不返回任何信息) + if not_enough_info_reason: + eval_step = { + "iteration": iteration + 1, + "thought": f"[最终评估] {eval_response}", "actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}], - "observations": ["评估阶段检测到not_enough_info"] + "observations": ["最终评估阶段检测到not_enough_info"] } thinking_steps.append(eval_step) logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason}" + f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段判断信息不足: {not_enough_info_reason}" ) - # React完成时输出消息列表 - _log_conversation_messages(conversation_messages, last_head_prompt) - - return False, not_enough_info_reason, thinking_steps, False - else: - # 非最终迭代,信息不足,继续搜集信息 - logger.info( - f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason},继续查询" + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"未找到答案:{not_enough_info_reason}", ) + + return False, "", thinking_steps, False - # 如果是最终迭代但没有明确判断,视为not_enough_info - if is_final_iteration: + # 如果没有明确判断,视为not_enough_info,返回空字符串(不返回任何信息) eval_step = { "iteration": iteration + 1, - "thought": f"[评估] {eval_response}", + "thought": f"[最终评估] {eval_response}", "actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}], "observations": ["已到达最后一次迭代,无法找到答案"] } thinking_steps.append(eval_step) logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案") - # React完成时输出消息列表 - _log_conversation_messages(conversation_messages, last_head_prompt) + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="未找到答案:已到达最后一次迭代,无法找到答案", + ) - return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False + return False, "", thinking_steps, False - # 非最终迭代且信息不足,使用head_prompt决定调用哪些工具 + # 前n-1次迭代,使用head_prompt决定调用哪些工具(包含found_answer工具) tool_definitions = tool_registry.get_tool_definitions() logger.info( f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}" ) - head_prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_react_prompt_head", - bot_name=bot_name, - time_now=time_now, - question=question, - collected_info=collected_info if collected_info else "", - current_iteration=current_iteration, - remaining_iterations=remaining_iterations, - max_iterations=max_iterations, - ) - last_head_prompt = head_prompt # 保存最后一次使用的head_prompt + # head_prompt应该只构建一次,使用初始的collected_info,后续迭代都复用同一个 + if first_head_prompt is None: + # 第一次构建,使用初始的collected_info(即initial_info) + initial_collected_info = initial_info if initial_info else "" + first_head_prompt = await global_prompt_manager.format_prompt( + "memory_retrieval_react_prompt_head", + bot_name=bot_name, + time_now=time_now, + question=question, + collected_info=initial_collected_info, + current_iteration=current_iteration, + remaining_iterations=remaining_iterations, + max_iterations=max_iterations, + ) + + # 后续迭代都复用第一次构建的head_prompt + head_prompt = first_head_prompt def message_factory( _client, @@ -643,8 +664,7 @@ async def _react_agent_solve_question( logger.error(f"ReAct Agent LLM调用失败: {response}") break - # 注意:这里不检查found_answer或not_enough_info,这些只在评估阶段(memory_retrieval_react_final_prompt)检查 - # memory_retrieval_react_prompt_head只用于决定调用哪些工具来搜集信息 + # 注意:这里会检查found_answer工具调用,如果检测到found_answer工具,会直接返回答案 assistant_message: Optional[Message] = None if tool_calls: @@ -687,6 +707,7 @@ async def _react_agent_solve_question( # 处理工具调用 tool_tasks = [] + found_answer_from_tool = None # 检测是否有found_answer工具调用 for i, tool_call in enumerate(tool_calls): tool_name = tool_call.func_name @@ -696,6 +717,24 @@ async def _react_agent_solve_question( f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})" ) + # 检查是否是found_answer工具调用 + if tool_name == "found_answer": + found_answer_from_tool = tool_args.get("answer", "") + if found_answer_from_tool: + step["actions"].append({"action_type": "found_answer", "action_params": {"answer": found_answer_from_tool}}) + step["observations"] = ["检测到found_answer工具调用"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过found_answer工具找到关于问题{question}的答案: {found_answer_from_tool}") + + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"找到答案:{found_answer_from_tool}", + ) + + return True, found_answer_from_tool, thinking_steps, False + continue # found_answer工具不需要执行,直接跳过 + # 普通工具调用 tool = tool_registry.get_tool(tool_name) if tool: @@ -740,15 +779,10 @@ async def _react_agent_solve_question( step["observations"].append(observation_text) collected_info += f"\n{observation_text}\n" if stripped_observation: - tool_builder = MessageBuilder() - tool_builder.set_role(RoleType.Tool) - tool_builder.add_text_content(observation_text) - tool_builder.add_tool_call(tool_call_item.call_id) - conversation_messages.append(tool_builder.build()) + # 检查工具输出中是否有新的jargon,如果有则追加到工具结果中 if enable_jargon_detection: jargon_concepts = _match_jargon_from_text(stripped_observation, chat_id) if jargon_concepts: - jargon_info = "" new_concepts = [] for concept in jargon_concepts: normalized_concept = concept.strip() @@ -757,9 +791,17 @@ async def _react_agent_solve_question( seen_jargon_concepts.add(normalized_concept) if new_concepts: jargon_info = await _retrieve_concepts_with_jargon(new_concepts, chat_id) - if jargon_info: - collected_info += f"\n{jargon_info}\n" - logger.info(f"工具输出触发黑话解析: {new_concepts}") + if jargon_info: + # 将jargon查询结果追加到工具结果中 + observation_text += f"\n\n{jargon_info}" + collected_info += f"\n{jargon_info}\n" + logger.info(f"工具输出触发黑话解析: {new_concepts}") + + tool_builder = MessageBuilder() + tool_builder.set_role(RoleType.Tool) + tool_builder.add_text_content(observation_text) + tool_builder.add_tool_call(tool_call_item.call_id) + conversation_messages.append(tool_builder.build()) thinking_steps.append(step) @@ -776,9 +818,14 @@ async def _react_agent_solve_question( logger.warning("ReAct Agent达到最大迭代次数,直接视为not_enough_info") # React完成时输出消息列表 - _log_conversation_messages(conversation_messages, last_head_prompt) + timeout_reason = "超时" if is_timeout else "达到最大迭代次数" + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"未找到答案:{timeout_reason}", + ) - return False, "未找到相关信息", thinking_steps, is_timeout + return False, "", thinking_steps, is_timeout def _get_recent_query_history(chat_id: str, time_window_seconds: float = 300.0) -> str: @@ -1023,9 +1070,9 @@ async def build_memory_retrieval_prompt( concept_info = await _retrieve_concepts_with_jargon(concepts, chat_id) if concept_info: initial_info += concept_info - logger.info(f"概念检索完成,结果: {concept_info}") + logger.debug(f"概念检索完成,结果: {concept_info}") else: - logger.info("概念检索未找到任何结果") + logger.debug("概念检索未找到任何结果") if not questions: logger.debug("模型认为不需要检索记忆或解析失败,不返回任何查询结果") diff --git a/src/memory_system/retrieval_tools/__init__.py b/src/memory_system/retrieval_tools/__init__.py index 17c3effb..7832985f 100644 --- a/src/memory_system/retrieval_tools/__init__.py +++ b/src/memory_system/retrieval_tools/__init__.py @@ -14,6 +14,7 @@ from .tool_registry import ( from .query_chat_history import register_tool as register_query_chat_history from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge from .query_person_info import register_tool as register_query_person_info +from .found_answer import register_tool as register_found_answer from src.config.config import global_config @@ -21,6 +22,7 @@ def init_all_tools(): """初始化并注册所有记忆检索工具""" register_query_chat_history() register_query_person_info() + register_found_answer() # 注册found_answer工具 if global_config.lpmm_knowledge.lpmm_mode == "agent": register_lpmm_knowledge() diff --git a/src/memory_system/retrieval_tools/found_answer.py b/src/memory_system/retrieval_tools/found_answer.py new file mode 100644 index 00000000..cef71940 --- /dev/null +++ b/src/memory_system/retrieval_tools/found_answer.py @@ -0,0 +1,41 @@ +""" +found_answer工具 - 用于在记忆检索过程中标记找到答案 +""" + +from typing import Dict, Any +from src.common.logger import get_logger +from .tool_registry import register_memory_retrieval_tool + +logger = get_logger("memory_retrieval_tools") + + +async def found_answer(answer: str) -> str: + """标记已找到问题的答案 + + Args: + answer: 找到的答案内容 + + Returns: + str: 确认信息 + """ + # 这个工具主要用于标记,实际答案会通过返回值传递 + logger.info(f"找到答案: {answer}") + return f"已确认找到答案: {answer}" + + +def register_tool(): + """注册found_answer工具""" + register_memory_retrieval_tool( + name="found_answer", + description="当你在已收集的信息中找到了问题的明确答案时,调用此工具标记已找到答案。只有在检索到明确、具体的答案时才使用此工具,不要编造信息。", + parameters=[ + { + "name": "answer", + "type": "string", + "description": "找到的答案内容,必须基于已收集的信息,不要编造", + "required": True, + }, + ], + execute_func=found_answer, + ) + diff --git a/src/memory_system/retrieval_tools/query_chat_history.py b/src/memory_system/retrieval_tools/query_chat_history.py index 478ccb97..097632c4 100644 --- a/src/memory_system/retrieval_tools/query_chat_history.py +++ b/src/memory_system/retrieval_tools/query_chat_history.py @@ -275,10 +275,6 @@ async def get_chat_history_detail(chat_id: str, memory_ids: str) -> str: except (json.JSONDecodeError, TypeError, ValueError): pass - # 添加原文内容 - if record.original_text: - result_parts.append(f"原文内容:\n{record.original_text}") - results.append("\n".join(result_parts)) if not results: @@ -318,7 +314,7 @@ def register_tool(): # 注册工具2:获取记忆详情 register_memory_retrieval_tool( name="get_chat_history_detail", - description="根据记忆ID,展示某条或某几条记忆的具体内容。包括主题、时间、参与人、关键词、概括、关键信息点和原文内容等详细信息。需要先使用search_chat_history工具获取记忆ID。", + description="根据记忆ID,展示某条或某几条记忆的具体内容。包括主题、时间、参与人、关键词、概括和关键信息点等详细信息。需要先使用search_chat_history工具获取记忆ID。", parameters=[ { "name": "memory_ids",