ref:重构记忆检索流程,略微提高消耗,提高精度

pull/1396/head
SengokuCola 2025-12-01 19:10:48 +08:00
parent 2b9975c48e
commit 32755987a0
2 changed files with 260 additions and 222 deletions

View File

@ -103,32 +103,21 @@ def init_memory_retrieval_prompt():
你正在参与聊天你需要搜集信息来回答问题帮助你参与聊天
**重要限制**
- 最大查询轮数{max_iterations}当前第{current_iteration}剩余{remaining_iterations}
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
当前需要解答的问题{question}
已收集的信息
{collected_info}
**执行步骤**
**第一步思考Think**
在思考中分析
- 当前信息是否足够回答问题({question})
- **如果信息足够且能找到明确答案**在思考中直接给出答案格式为found_answer(answer="你的答案内容")
- **如果信息不足以解答问题需要尝试搜集更多信息进一步调用工具进入第二步行动环节
- **如果已有信息不足或无法找到答案决定结束查询**在思考中给出not_enough_info(reason="结束查询的原因")
**第二步行动Action**
- 如果涉及过往事件或者查询某个过去可能提到过的概念或者某段时间发生的事件可以使用聊天记录查询工具查询过往事件
- 如果涉及人物可以使用人物信息查询工具查询人物信息
- 如果没有可靠信息且查询时间充足或者不确定查询类别也可以使用lpmm知识库查询作为辅助信息
- 如果信息不足需要使用tool说明需要查询什么并输出为纯文本说明然后调用相应工具查询可并行调用多个工具
**重要规则**
- **只有在检索到明确有关的信息并得出答案时才使用found_answer**
- **如果信息不足无法确定找不到相关信息导致的无法回答问题决定结束查询必须使用not_enough_info不要使用found_answer**
- 答案必须在思考中给出格式为 found_answer(answer="...") not_enough_info(reason="...")
**思考**
- 你可以对查询思路给出简短的思考
- 你必须给出使用什么工具进行查询
""",
name="memory_retrieval_react_prompt_head",
)
@ -325,6 +314,66 @@ def _match_jargon_from_text(chat_text: str, chat_id: str) -> List[str]:
return list(matched.keys())
def _log_conversation_messages(conversation_messages: List[Message], head_prompt: Optional[str] = None) -> None:
"""输出对话消息列表的日志
Args:
conversation_messages: 对话消息列表
head_prompt: 第一条系统消息head_prompt的内容可选
"""
if not global_config.debug.show_memory_prompt:
return
log_lines = []
# 如果有head_prompt先添加为第一条消息
if head_prompt:
msg_info = "\n[消息 1] 角色: System 内容类型: 文本\n========================================"
msg_info += f"\n{head_prompt}"
log_lines.append(msg_info)
start_idx = 2
else:
start_idx = 1
if not conversation_messages and not head_prompt:
return
for idx, msg in enumerate(conversation_messages, start_idx):
role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
# 处理内容 - 显示完整内容,不截断
if isinstance(msg.content, str):
full_content = msg.content
content_type = "文本"
elif isinstance(msg.content, list):
text_parts = [item for item in msg.content if isinstance(item, str)]
image_count = len([item for item in msg.content if isinstance(item, tuple)])
full_content = "".join(text_parts) if text_parts else ""
content_type = f"混合({len(text_parts)}段文本, {image_count}张图片)"
else:
full_content = str(msg.content)
content_type = "未知"
# 构建单条消息的日志信息
msg_info = f"\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n========================================"
if full_content:
msg_info += f"\n{full_content}"
if msg.tool_calls:
msg_info += f"\n 工具调用: {len(msg.tool_calls)}"
for tool_call in msg.tool_calls:
msg_info += f"\n - {tool_call}"
if msg.tool_call_id:
msg_info += f"\n 工具调用ID: {msg.tool_call_id}"
log_lines.append(msg_info)
total_count = len(conversation_messages) + (1 if head_prompt else 0)
logger.info(f"消息列表 (共{total_count}条):{''.join(log_lines)}")
async def _react_agent_solve_question(
question: str,
chat_id: str,
@ -358,6 +407,7 @@ async def _react_agent_solve_question(
thinking_steps = []
is_timeout = False
conversation_messages: List[Message] = []
last_head_prompt: Optional[str] = None # 保存最后一次使用的head_prompt
for iteration in range(max_iterations):
# 检查超时
@ -380,144 +430,41 @@ async def _react_agent_solve_question(
remaining_iterations = max_iterations - current_iteration
is_final_iteration = current_iteration >= max_iterations
if is_final_iteration:
# 最后一次迭代使用最终prompt
tool_definitions = []
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: 0最后一次迭代不提供工具调用"
)
prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
if global_config.debug.show_memory_prompt:
logger.info(f"ReAct Agent 第 {iteration + 1} 次Prompt: {prompt}")
success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools(
prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
else:
# 非最终迭代使用head_prompt
tool_definitions = tool_registry.get_tool_definitions()
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}"
)
head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
def message_factory(
_client,
*,
_head_prompt: str = head_prompt,
_conversation_messages: List[Message] = conversation_messages,
) -> List[Message]:
messages: List[Message] = []
system_builder = MessageBuilder()
system_builder.set_role(RoleType.System)
system_builder.add_text_content(_head_prompt)
messages.append(system_builder.build())
messages.extend(_conversation_messages)
if global_config.debug.show_memory_prompt:
# 优化日志展示 - 合并所有消息到一条日志
log_lines = []
for idx, msg in enumerate(messages, 1):
role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
# 处理内容 - 显示完整内容,不截断
if isinstance(msg.content, str):
full_content = msg.content
content_type = "文本"
elif isinstance(msg.content, list):
text_parts = [item for item in msg.content if isinstance(item, str)]
image_count = len([item for item in msg.content if isinstance(item, tuple)])
full_content = "".join(text_parts) if text_parts else ""
content_type = f"混合({len(text_parts)}段文本, {image_count}张图片)"
else:
full_content = str(msg.content)
content_type = "未知"
# 构建单条消息的日志信息
msg_info = f"\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n========================================"
if full_content:
msg_info += f"\n{full_content}"
if msg.tool_calls:
msg_info += f"\n 工具调用: {len(msg.tool_calls)}"
for tool_call in msg.tool_calls:
msg_info += f"\n - {tool_call}"
if msg.tool_call_id:
msg_info += f"\n 工具调用ID: {msg.tool_call_id}"
log_lines.append(msg_info)
# 合并所有消息为一条日志输出
logger.info(f"消息列表 (共{len(messages)}条):{''.join(log_lines)}")
return messages
(
success,
response,
reasoning_content,
model_name,
tool_calls,
) = await llm_api.generate_with_model_with_tools_by_message_factory(
message_factory,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}"
# 每次迭代开始时,先评估当前信息是否足够回答问题
evaluation_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
if not success:
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
# if global_config.debug.show_memory_prompt:
# logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估Prompt: {evaluation_prompt}")
assistant_message: Optional[Message] = None
if tool_calls:
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
if response and response.strip():
assistant_builder.add_text_content(response)
assistant_builder.set_tool_calls(tool_calls)
assistant_message = assistant_builder.build()
elif response and response.strip():
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
assistant_builder.add_text_content(response)
assistant_message = assistant_builder.build()
eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools(
evaluation_prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=[], # 评估阶段不提供工具
request_type="memory.react.eval",
)
# 记录思考步骤
step = {"iteration": iteration + 1, "thought": response, "actions": [], "observations": []}
if not eval_success:
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段 LLM调用失败: {eval_response}")
# 评估失败,如果还有剩余迭代次数,尝试继续查询
if not is_final_iteration:
continue
else:
break
# 优先从思考内容中提取found_answer或not_enough_info
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估响应: {eval_response}"
)
# 提取函数调用中参数的值,支持单引号和双引号
def extract_quoted_content(text, func_name, param_name):
"""从文本中提取函数调用中参数的值,支持单引号和双引号
@ -575,44 +522,147 @@ async def _react_agent_solve_question(
return None
# 从LLM的直接输出内容中提取found_answer或not_enough_info
# 从评估响应中提取found_answer或not_enough_info
found_answer_content = None
not_enough_info_reason = None
# 只检查responseLLM的直接输出内容不检查reasoning_content
if response:
found_answer_content = extract_quoted_content(response, "found_answer", "answer")
if eval_response:
found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer")
if not found_answer_content:
not_enough_info_reason = extract_quoted_content(response, "not_enough_info", "reason")
not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason")
# 如果从输出内容中找到答案,直接返回
# 如果找到答案,直接返回
if found_answer_content:
step["actions"].append({"action_type": "found_answer", "action_params": {"answer": found_answer_content}})
step["observations"] = ["从LLM输出内容中检测到found_answer"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 找到关于问题{question}的答案: {found_answer_content}")
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}],
"observations": ["评估阶段检测到found_answer"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段找到关于问题{question}的答案: {found_answer_content}")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return True, found_answer_content, thinking_steps, False
# 如果评估为not_enough_info且是最终迭代返回not_enough_info
if not_enough_info_reason:
step["actions"].append(
{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}
)
step["observations"] = ["从LLM输出内容中检测到not_enough_info"]
thinking_steps.append(step)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 无法找到关于问题{question}的答案,原因: {not_enough_info_reason}"
)
return False, not_enough_info_reason, thinking_steps, False
if is_final_iteration:
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}],
"observations": ["评估阶段检测到not_enough_info"]
}
thinking_steps.append(eval_step)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason}"
)
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return False, not_enough_info_reason, thinking_steps, False
else:
# 非最终迭代,信息不足,继续搜集信息
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason},继续查询"
)
# 如果是最终迭代但没有明确判断视为not_enough_info
if is_final_iteration:
step["actions"].append(
{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}
)
step["observations"] = ["已到达最后一次迭代,无法找到答案"]
thinking_steps.append(step)
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}],
"observations": ["已到达最后一次迭代,无法找到答案"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False
# 非最终迭代且信息不足使用head_prompt决定调用哪些工具
tool_definitions = tool_registry.get_tool_definitions()
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}"
)
head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
last_head_prompt = head_prompt # 保存最后一次使用的head_prompt
def message_factory(
_client,
*,
_head_prompt: str = head_prompt,
_conversation_messages: List[Message] = conversation_messages,
) -> List[Message]:
messages: List[Message] = []
system_builder = MessageBuilder()
system_builder.set_role(RoleType.System)
system_builder.add_text_content(_head_prompt)
messages.append(system_builder.build())
messages.extend(_conversation_messages)
return messages
(
success,
response,
reasoning_content,
model_name,
tool_calls,
) = await llm_api.generate_with_model_with_tools_by_message_factory(
message_factory,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}"
)
if not success:
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
# 注意这里不检查found_answer或not_enough_info这些只在评估阶段memory_retrieval_react_final_prompt检查
# memory_retrieval_react_prompt_head只用于决定调用哪些工具来搜集信息
assistant_message: Optional[Message] = None
if tool_calls:
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
if response and response.strip():
assistant_builder.add_text_content(response)
assistant_builder.set_tool_calls(tool_calls)
assistant_message = assistant_builder.build()
elif response and response.strip():
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
assistant_builder.add_text_content(response)
assistant_message = assistant_builder.build()
# 记录思考步骤
step = {"iteration": iteration + 1, "thought": response, "actions": [], "observations": []}
if assistant_message:
conversation_messages.append(assistant_message)
@ -624,21 +674,16 @@ async def _react_agent_solve_question(
# 处理工具调用
if not tool_calls:
# 没有工具调用说明LLM在思考中已经给出了答案已在前面检查或者需要继续查询
# 如果思考中没有答案,说明需要继续查询或等待下一轮
# 如果没有工具调用,记录思考过程,继续下一轮迭代(下一轮会再次评估)
if response and response.strip():
# 如果响应不为空,记录思考过程,继续下一轮迭代
step["observations"] = [f"思考完成,但未调用工具。响应: {response}"]
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response}")
# 继续下一轮迭代让LLM有机会在思考中给出found_answer或继续查询
collected_info += f"思考: {response}"
thinking_steps.append(step)
continue
else:
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 无工具调用且无响应")
step["observations"] = ["无响应且无工具调用"]
thinking_steps.append(step)
break
thinking_steps.append(step)
continue
# 处理工具调用
tool_tasks = []
@ -655,12 +700,10 @@ async def _react_agent_solve_question(
tool = tool_registry.get_tool(tool_name)
if tool:
# 准备工具参数需要添加chat_id如果工具需要
tool_params = tool_args.copy()
# 如果工具函数签名需要chat_id添加它
import inspect
sig = inspect.signature(tool.execute_func)
tool_params = tool_args.copy()
if "chat_id" in sig.parameters:
tool_params["chat_id"] = chat_id
@ -717,7 +760,6 @@ async def _react_agent_solve_question(
if jargon_info:
collected_info += f"\n{jargon_info}\n"
logger.info(f"工具输出触发黑话解析: {new_concepts}")
# logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行结果: {observation_text}")
thinking_steps.append(step)
@ -732,6 +774,10 @@ async def _react_agent_solve_question(
logger.warning("ReAct Agent超时直接视为not_enough_info")
else:
logger.warning("ReAct Agent达到最大迭代次数直接视为not_enough_info")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return False, "未找到相关信息", thinking_steps, is_timeout

View File

@ -15,17 +15,14 @@ logger = get_logger("memory_retrieval_tools")
async def search_chat_history(
chat_id: str, keyword: Optional[str] = None, participant: Optional[str] = None, fuzzy: bool = True
chat_id: str, keyword: Optional[str] = None, participant: Optional[str] = None
) -> str:
"""根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywords
Args:
chat_id: 聊天ID
keyword: 关键词可选支持多个关键词可用空格逗号等分隔
keyword: 关键词可选支持多个关键词可用空格逗号等分隔匹配规则如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配
participant: 参与人昵称可选
fuzzy: 是否使用模糊匹配模式默认True
- True: 模糊匹配只要包含任意一个关键词即匹配OR关系
- False: 全匹配必须包含所有关键词才匹配AND关系
Returns:
str: 查询结果包含记忆idtheme和keywords
@ -96,31 +93,28 @@ async def search_chat_history(
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 根据匹配模式检查关键词
if fuzzy:
# 模糊匹配只要包含任意一个关键词即匹配OR关系
for kw in keywords_lower:
if (
kw in theme
or kw in summary
or kw in original_text
or any(kw in k for k in record_keywords_list)
):
keyword_matched = True
break
# 有容错的全匹配:如果关键词数量>2允许n-1个关键词匹配否则必须全部匹配
matched_count = 0
for kw in keywords_lower:
kw_matched = (
kw in theme
or kw in summary
or kw in original_text
or any(kw in k for k in record_keywords_list)
)
if kw_matched:
matched_count += 1
# 计算需要匹配的关键词数量
total_keywords = len(keywords_lower)
if total_keywords > 2:
# 关键词数量>2允许n-1个关键词匹配
required_matches = total_keywords - 1
else:
# 全匹配必须包含所有关键词才匹配AND关系
keyword_matched = True
for kw in keywords_lower:
kw_matched = (
kw in theme
or kw in summary
or kw in original_text
or any(kw in k for k in record_keywords_list)
)
if not kw_matched:
keyword_matched = False
break
# 关键词数量<=2必须全部匹配
required_matches = total_keywords
keyword_matched = matched_count >= required_matches
# 两者都匹配如果同时有participant和keyword需要两者都匹配如果只有一个条件只需要该条件匹配
matched = participant_matched and keyword_matched
@ -134,8 +128,12 @@ async def search_chat_history(
return f"未找到包含关键词'{keywords_str}'且参与人包含'{participant}'的聊天记录"
elif keyword:
keywords_str = "".join(parse_keywords_string(keyword))
match_mode = "包含任意一个关键词" if fuzzy else "包含所有关键词"
return f"未找到{match_mode}'{keywords_str}'的聊天记录"
keywords_list = parse_keywords_string(keyword)
if len(keywords_list) > 2:
required_count = len(keywords_list) - 1
return f"未找到包含至少{required_count}个关键词(共{len(keywords_list)}个)'{keywords_str}'的聊天记录"
else:
return f"未找到包含所有关键词'{keywords_str}'的聊天记录"
elif participant:
return f"未找到参与人包含'{participant}'的聊天记录"
else:
@ -299,12 +297,12 @@ def register_tool():
# 注册工具1搜索记忆
register_memory_retrieval_tool(
name="search_chat_history",
description="根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywords。用于快速搜索和定位相关记忆。",
description="根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywords。用于快速搜索和定位相关记忆。匹配规则:如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配容错匹配",
parameters=[
{
"name": "keyword",
"type": "string",
"description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘''麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索",
"description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘''麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索。匹配规则:如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配",
"required": False,
},
{
@ -313,12 +311,6 @@ def register_tool():
"description": "参与人昵称(可选),用于查询包含该参与人的记忆",
"required": False,
},
{
"name": "fuzzy",
"type": "boolean",
"description": "是否使用模糊匹配模式默认True。True表示模糊匹配只要包含任意一个关键词即匹配OR关系False表示全匹配必须包含所有关键词才匹配AND关系",
"required": False,
},
],
execute_func=search_chat_history,
)