better:加强记忆检索能力

pull/1397/head
SengokuCola 2025-12-02 15:08:20 +08:00
parent c562ebe97a
commit 138afe41a4
4 changed files with 213 additions and 127 deletions

View File

@ -104,6 +104,7 @@ def init_memory_retrieval_prompt():
**重要限制**
- 思考要简短直接切入要点
- 最大查询轮数{max_iterations}当前第{current_iteration}剩余{remaining_iterations}
当前需要解答的问题{question}
已收集的信息
@ -113,11 +114,13 @@ def init_memory_retrieval_prompt():
- 如果涉及过往事件或者查询某个过去可能提到过的概念或者某段时间发生的事件可以使用聊天记录查询工具查询过往事件
- 如果涉及人物可以使用人物信息查询工具查询人物信息
- 如果没有可靠信息且查询时间充足或者不确定查询类别也可以使用lpmm知识库查询作为辅助信息
- 如果信息不足需要使用tool说明需要查询什么并输出为纯文本说明然后调用相应工具查询可并行调用多个工具
- **如果信息不足需要使用tool说明需要查询什么并输出为纯文本说明然后调用相应工具查询可并行调用多个工具**
- **如果当前已收集的信息足够回答问题且能找到明确答案调用found_answer工具标记已找到答案**
**思考**
- 你可以对查询思路给出简短的思考
- 你必须给出使用什么工具进行查询
- 如果信息不足你必须给出使用什么工具进行查询
- 如果信息足够你必须调用found_answer工具
""",
name="memory_retrieval_react_prompt_head",
)
@ -314,33 +317,38 @@ def _match_jargon_from_text(chat_text: str, chat_id: str) -> List[str]:
return list(matched.keys())
def _log_conversation_messages(conversation_messages: List[Message], head_prompt: Optional[str] = None) -> None:
def _log_conversation_messages(
conversation_messages: List[Message],
head_prompt: Optional[str] = None,
final_status: Optional[str] = None,
) -> None:
"""输出对话消息列表的日志
Args:
conversation_messages: 对话消息列表
head_prompt: 第一条系统消息head_prompt的内容可选
final_status: 最终结果状态描述例如找到答案/未找到答案可选
"""
if not global_config.debug.show_memory_prompt:
return
log_lines = []
log_lines: List[str] = []
# 如果有head_prompt先添加为第一条消息
if head_prompt:
msg_info = "\n[消息 1] 角色: System 内容类型: 文本\n========================================"
msg_info = "========================================\n[消息 1] 角色: System 内容类型: 文本\n-----------------------"
msg_info += f"\n{head_prompt}"
log_lines.append(msg_info)
start_idx = 2
else:
start_idx = 1
if not conversation_messages and not head_prompt:
return
for idx, msg in enumerate(conversation_messages, start_idx):
role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
# 处理内容 - 显示完整内容,不截断
if isinstance(msg.content, str):
full_content = msg.content
@ -353,25 +361,28 @@ def _log_conversation_messages(conversation_messages: List[Message], head_prompt
else:
full_content = str(msg.content)
content_type = "未知"
# 构建单条消息的日志信息
msg_info = f"\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n========================================"
if full_content:
msg_info += f"\n{full_content}"
if msg.tool_calls:
msg_info += f"\n 工具调用: {len(msg.tool_calls)}"
for tool_call in msg.tool_calls:
msg_info += f"\n - {tool_call}"
if msg.tool_call_id:
msg_info += f"\n 工具调用ID: {msg.tool_call_id}"
log_lines.append(msg_info)
total_count = len(conversation_messages) + (1 if head_prompt else 0)
logger.info(f"消息列表 (共{total_count}条):{''.join(log_lines)}")
log_text = f"消息列表 (共{total_count}条):{''.join(log_lines)}"
if final_status:
log_text += f"\n\n[最终结果] {final_status}"
logger.info(log_text)
async def _react_agent_solve_question(
@ -407,7 +418,7 @@ async def _react_agent_solve_question(
thinking_steps = []
is_timeout = False
conversation_messages: List[Message] = []
last_head_prompt: Optional[str] = None # 保存最后一次使用的head_prompt
first_head_prompt: Optional[str] = None # 保存第一次使用的head_prompt用于日志显示
for iteration in range(max_iterations):
# 检查超时
@ -430,40 +441,6 @@ async def _react_agent_solve_question(
remaining_iterations = max_iterations - current_iteration
is_final_iteration = current_iteration >= max_iterations
# 每次迭代开始时,先评估当前信息是否足够回答问题
evaluation_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
# if global_config.debug.show_memory_prompt:
# logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估Prompt: {evaluation_prompt}")
eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools(
evaluation_prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=[], # 评估阶段不提供工具
request_type="memory.react.eval",
)
if not eval_success:
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段 LLM调用失败: {eval_response}")
# 评估失败,如果还有剩余迭代次数,尝试继续查询
if not is_final_iteration:
continue
else:
break
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估响应: {eval_response}"
)
# 提取函数调用中参数的值,支持单引号和双引号
def extract_quoted_content(text, func_name, param_name):
"""从文本中提取函数调用中参数的值,支持单引号和双引号
@ -522,88 +499,132 @@ async def _react_agent_solve_question(
return None
# 从评估响应中提取found_answer或not_enough_info
found_answer_content = None
not_enough_info_reason = None
# 如果是最后一次迭代使用final_prompt进行总结
if is_final_iteration:
evaluation_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
if eval_response:
found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer")
if not found_answer_content:
not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason")
if global_config.debug.show_memory_prompt:
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估Prompt: {evaluation_prompt}")
# 如果找到答案,直接返回
if found_answer_content:
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}],
"observations": ["评估阶段检测到found_answer"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段找到关于问题{question}的答案: {found_answer_content}")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return True, found_answer_content, thinking_steps, False
eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools(
evaluation_prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=[], # 最终评估阶段不提供工具
request_type="memory.react.final",
)
# 如果评估为not_enough_info且是最终迭代返回not_enough_info
if not_enough_info_reason:
if is_final_iteration:
if not eval_success:
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段 LLM调用失败: {eval_response}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案最终评估阶段LLM调用失败",
)
return False, "最终评估阶段LLM调用失败", thinking_steps, False
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估响应: {eval_response}"
)
# 从最终评估响应中提取found_answer或not_enough_info
found_answer_content = None
not_enough_info_reason = None
if eval_response:
found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer")
if not found_answer_content:
not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason")
# 如果找到答案,返回
if found_answer_content:
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}],
"observations": ["最终评估阶段检测到found_answer"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段找到关于问题{question}的答案: {found_answer_content}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{found_answer_content}",
)
return True, found_answer_content, thinking_steps, False
# 如果评估为not_enough_info返回空字符串不返回任何信息
if not_enough_info_reason:
eval_step = {
"iteration": iteration + 1,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}],
"observations": ["评估阶段检测到not_enough_info"]
"observations": ["最终评估阶段检测到not_enough_info"]
}
thinking_steps.append(eval_step)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason}"
f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段判断信息不足: {not_enough_info_reason}"
)
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
return False, not_enough_info_reason, thinking_steps, False
else:
# 非最终迭代,信息不足,继续搜集信息
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 评估阶段判断信息不足: {not_enough_info_reason},继续查询"
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"未找到答案:{not_enough_info_reason}",
)
return False, "", thinking_steps, False
# 如果是最终迭代但没有明确判断视为not_enough_info
if is_final_iteration:
# 如果没有明确判断视为not_enough_info返回空字符串不返回任何信息
eval_step = {
"iteration": iteration + 1,
"thought": f"[评估] {eval_response}",
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}],
"observations": ["已到达最后一次迭代,无法找到答案"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案:已到达最后一次迭代,无法找到答案",
)
return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False
return False, "", thinking_steps, False
# 非最终迭代且信息不足使用head_prompt决定调用哪些工具
# 前n-1次迭代使用head_prompt决定调用哪些工具包含found_answer工具
tool_definitions = tool_registry.get_tool_definitions()
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}"
)
head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
last_head_prompt = head_prompt # 保存最后一次使用的head_prompt
# head_prompt应该只构建一次使用初始的collected_info后续迭代都复用同一个
if first_head_prompt is None:
# 第一次构建使用初始的collected_info即initial_info
initial_collected_info = initial_info if initial_info else ""
first_head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=initial_collected_info,
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
# 后续迭代都复用第一次构建的head_prompt
head_prompt = first_head_prompt
def message_factory(
_client,
@ -643,8 +664,7 @@ async def _react_agent_solve_question(
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
# 注意这里不检查found_answer或not_enough_info这些只在评估阶段memory_retrieval_react_final_prompt检查
# memory_retrieval_react_prompt_head只用于决定调用哪些工具来搜集信息
# 注意这里会检查found_answer工具调用如果检测到found_answer工具会直接返回答案
assistant_message: Optional[Message] = None
if tool_calls:
@ -687,6 +707,7 @@ async def _react_agent_solve_question(
# 处理工具调用
tool_tasks = []
found_answer_from_tool = None # 检测是否有found_answer工具调用
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call.func_name
@ -696,6 +717,24 @@ async def _react_agent_solve_question(
f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})"
)
# 检查是否是found_answer工具调用
if tool_name == "found_answer":
found_answer_from_tool = tool_args.get("answer", "")
if found_answer_from_tool:
step["actions"].append({"action_type": "found_answer", "action_params": {"answer": found_answer_from_tool}})
step["observations"] = ["检测到found_answer工具调用"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过found_answer工具找到关于问题{question}的答案: {found_answer_from_tool}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{found_answer_from_tool}",
)
return True, found_answer_from_tool, thinking_steps, False
continue # found_answer工具不需要执行直接跳过
# 普通工具调用
tool = tool_registry.get_tool(tool_name)
if tool:
@ -740,15 +779,10 @@ async def _react_agent_solve_question(
step["observations"].append(observation_text)
collected_info += f"\n{observation_text}\n"
if stripped_observation:
tool_builder = MessageBuilder()
tool_builder.set_role(RoleType.Tool)
tool_builder.add_text_content(observation_text)
tool_builder.add_tool_call(tool_call_item.call_id)
conversation_messages.append(tool_builder.build())
# 检查工具输出中是否有新的jargon如果有则追加到工具结果中
if enable_jargon_detection:
jargon_concepts = _match_jargon_from_text(stripped_observation, chat_id)
if jargon_concepts:
jargon_info = ""
new_concepts = []
for concept in jargon_concepts:
normalized_concept = concept.strip()
@ -757,9 +791,17 @@ async def _react_agent_solve_question(
seen_jargon_concepts.add(normalized_concept)
if new_concepts:
jargon_info = await _retrieve_concepts_with_jargon(new_concepts, chat_id)
if jargon_info:
collected_info += f"\n{jargon_info}\n"
logger.info(f"工具输出触发黑话解析: {new_concepts}")
if jargon_info:
# 将jargon查询结果追加到工具结果中
observation_text += f"\n\n{jargon_info}"
collected_info += f"\n{jargon_info}\n"
logger.info(f"工具输出触发黑话解析: {new_concepts}")
tool_builder = MessageBuilder()
tool_builder.set_role(RoleType.Tool)
tool_builder.add_text_content(observation_text)
tool_builder.add_tool_call(tool_call_item.call_id)
conversation_messages.append(tool_builder.build())
thinking_steps.append(step)
@ -776,9 +818,14 @@ async def _react_agent_solve_question(
logger.warning("ReAct Agent达到最大迭代次数直接视为not_enough_info")
# React完成时输出消息列表
_log_conversation_messages(conversation_messages, last_head_prompt)
timeout_reason = "超时" if is_timeout else "达到最大迭代次数"
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"未找到答案:{timeout_reason}",
)
return False, "未找到相关信息", thinking_steps, is_timeout
return False, "", thinking_steps, is_timeout
def _get_recent_query_history(chat_id: str, time_window_seconds: float = 300.0) -> str:
@ -1023,9 +1070,9 @@ async def build_memory_retrieval_prompt(
concept_info = await _retrieve_concepts_with_jargon(concepts, chat_id)
if concept_info:
initial_info += concept_info
logger.info(f"概念检索完成,结果: {concept_info}")
logger.debug(f"概念检索完成,结果: {concept_info}")
else:
logger.info("概念检索未找到任何结果")
logger.debug("概念检索未找到任何结果")
if not questions:
logger.debug("模型认为不需要检索记忆或解析失败,不返回任何查询结果")

View File

@ -14,6 +14,7 @@ from .tool_registry import (
from .query_chat_history import register_tool as register_query_chat_history
from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge
from .query_person_info import register_tool as register_query_person_info
from .found_answer import register_tool as register_found_answer
from src.config.config import global_config
@ -21,6 +22,7 @@ def init_all_tools():
"""初始化并注册所有记忆检索工具"""
register_query_chat_history()
register_query_person_info()
register_found_answer() # 注册found_answer工具
if global_config.lpmm_knowledge.lpmm_mode == "agent":
register_lpmm_knowledge()

View File

@ -0,0 +1,41 @@
"""
found_answer工具 - 用于在记忆检索过程中标记找到答案
"""
from typing import Dict, Any
from src.common.logger import get_logger
from .tool_registry import register_memory_retrieval_tool
logger = get_logger("memory_retrieval_tools")
async def found_answer(answer: str) -> str:
"""标记已找到问题的答案
Args:
answer: 找到的答案内容
Returns:
str: 确认信息
"""
# 这个工具主要用于标记,实际答案会通过返回值传递
logger.info(f"找到答案: {answer}")
return f"已确认找到答案: {answer}"
def register_tool():
"""注册found_answer工具"""
register_memory_retrieval_tool(
name="found_answer",
description="当你在已收集的信息中找到了问题的明确答案时,调用此工具标记已找到答案。只有在检索到明确、具体的答案时才使用此工具,不要编造信息。",
parameters=[
{
"name": "answer",
"type": "string",
"description": "找到的答案内容,必须基于已收集的信息,不要编造",
"required": True,
},
],
execute_func=found_answer,
)

View File

@ -275,10 +275,6 @@ async def get_chat_history_detail(chat_id: str, memory_ids: str) -> str:
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 添加原文内容
if record.original_text:
result_parts.append(f"原文内容:\n{record.original_text}")
results.append("\n".join(result_parts))
if not results:
@ -318,7 +314,7 @@ def register_tool():
# 注册工具2获取记忆详情
register_memory_retrieval_tool(
name="get_chat_history_detail",
description="根据记忆ID展示某条或某几条记忆的具体内容。包括主题、时间、参与人、关键词、概括、关键信息点和原文内容等详细信息。需要先使用search_chat_history工具获取记忆ID。",
description="根据记忆ID展示某条或某几条记忆的具体内容。包括主题、时间、参与人、关键词、概括和关键信息点等详细信息。需要先使用search_chat_history工具获取记忆ID。",
parameters=[
{
"name": "memory_ids",