MaiBot/src/memory_system/memory_retrieval.py

1115 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import json
import re
import random
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.plugin_system.apis import llm_api
from src.common.database.database_model import ThinkingBack
from json_repair import repair_json
from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools
from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message
logger = get_logger("memory_retrieval")
def init_memory_retrieval_prompt():
"""初始化记忆检索相关的 prompt 模板和工具"""
# 首先注册所有工具
init_all_tools()
# 第一步问题生成prompt
Prompt(
"""
你的名字是{bot_name}。现在是{time_now}
群里正在进行的聊天内容:
{chat_history}
{recent_query_history}
现在,{sender}发送了内容:{target_message},你想要回复ta。
请仔细分析聊天内容,考虑以下几点:
1. 对话中是否提到了过去发生的事情、人物、事件或信息
2. 是否有需要回忆的内容(比如"之前说过""上次""以前"等)
3. 是否有需要查找历史信息的问题
4. 是否有问题可以搜集信息帮助你聊天
5. 对话中是否包含黑话、俚语、缩写等可能需要查询的概念
重要提示:
- **每次只能提出一个问题**,选择最需要查询的关键问题
- 如果"最近已查询的问题和结果"中已经包含了类似的问题,请避免重复生成相同或相似的问题
- 如果之前已经查询过某个问题但未找到答案,可以尝试用不同的方式提问或更具体的问题
- 如果之前已经查询过某个问题并找到了答案,可以直接参考已有结果,不需要重复查询
如果你认为需要从记忆中检索信息来回答,请:
1. 先识别对话中可能需要查询的概念(黑话/俚语/缩写/人名/专有名词等关键词)
2. 然后根据上下文提出**一个**最关键的问题来帮助你回复目标消息
问题格式示例:
- "xxx在前几天干了什么"
- "xxx是什么"
- "xxxx和xxx的关系是什么"
- "xxx在某个时间点发生了什么"
请输出JSON格式包含两个字段
- "concepts": 需要检索的概念列表(字符串数组),如果不需要检索概念则输出空数组[]
- "questions": 问题数组(字符串数组),如果不需要检索记忆则输出空数组[],如果需要检索则只输出包含一个问题的数组
输出格式示例(需要检索时):
```json
{{
"concepts": ["AAA", "BBB", "CCC"],
"questions": ["张三在前几天干了什么"]
}}
```
输出格式示例(不需要检索时):
```json
{{
"concepts": [],
"questions": []
}}
```
请只输出JSON对象不要输出其他内容
""",
name="memory_retrieval_question_prompt",
)
# 第二步ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}。现在是{time_now}
你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。
你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。
**重要限制:**
- 最大查询轮数5轮当前第{current_iteration}轮,剩余{remaining_iterations}轮)
- 必须尽快得出答案,避免不必要的查询
- 思考要简短,直接切入要点
- 必须严格使用检索到的信息回答问题,不要编造信息
当前问题:{question}
已收集的信息:
{collected_info}
**执行步骤:**
**第一步思考Think**
在思考中分析:
- 当前信息是否足够回答问题?
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够,说明最需要查询什么,并输出为纯文本说明
**第二步行动Action**
根据思考结果立即行动:
- 如果思考中已给出final_answer → 无需调用工具,直接结束
- 如果信息不足 → 调用相应工具查询(可并行调用多个工具)
- 如果多次查询仍无结果 → 在思考中给出no_answer(reason="无法找到答案的原因")
**重要:答案必须在思考中给出,格式为 final_answer(answer="...") 或 no_answer(reason="..."),不要调用工具。**
""",
name="memory_retrieval_react_prompt",
)
# 第二步ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}。现在是{time_now}
你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。
你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。
**重要限制:**
- 最大查询轮数5轮当前第{current_iteration}轮,剩余{remaining_iterations}轮)
- 必须尽快得出答案,避免不必要的查询
- 思考要简短,直接切入要点
- 必须严格使用检索到的信息回答问题,不要编造信息
当前问题:{question}
**执行步骤:**
**第一步思考Think**
在思考中分析:
- 当前信息是否足够回答问题?
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够,说明最需要查询什么,并输出为纯文本说明
**第二步行动Action**
根据思考结果立即行动:
- 如果思考中已给出final_answer → 无需调用工具,直接结束
- 如果信息不足 → 调用相应工具查询(可并行调用多个工具)
- 如果多次查询仍无结果 → 在思考中给出no_answer(reason="无法找到答案的原因")
**重要:答案必须在思考中给出,格式为 final_answer(answer="...") 或 no_answer(reason="..."),不要调用工具。**
""",
name="memory_retrieval_react_prompt_head",
)
# 额外如果最后一轮迭代ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}。现在是{time_now}
你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。
**重要限制:**
- 你已经经过几轮查询,尝试了信息搜集,现在你需要总结信息,选择回答问题或判断问题无法回答
- 思考要简短,直接切入要点
- 必须严格使用检索到的信息回答问题,不要编造信息
当前问题:{question}
已收集的信息:
{collected_info}
**执行步骤:**
分析:
- 当前信息是否足够回答问题?
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够在思考中给出no_answer(reason="无法找到答案的原因")
**重要:答案必须给出,格式为 final_answer(answer="...") 或 no_answer(reason="...")。**
""",
name="memory_retrieval_react_final_prompt",
)
def _parse_react_response(response: str) -> Optional[Dict[str, Any]]:
"""解析ReAct Agent的响应
Args:
response: LLM返回的响应
Returns:
Dict[str, Any]: 解析后的动作信息如果解析失败返回None
格式: {"thought": str, "actions": List[Dict[str, Any]]}
每个action格式: {"action_type": str, "action_params": dict}
"""
try:
# 尝试提取JSON可能包含在```json代码块中
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
if matches:
json_str = matches[0]
else:
# 尝试直接解析整个响应
json_str = response.strip()
# 修复可能的JSON错误
repaired_json = repair_json(json_str)
# 解析JSON
action_info = json.loads(repaired_json)
if not isinstance(action_info, dict):
logger.warning(f"解析的JSON不是对象格式: {action_info}")
return None
# 确保actions字段存在且为列表
if "actions" not in action_info:
logger.warning(f"响应中缺少actions字段: {action_info}")
return None
if not isinstance(action_info["actions"], list):
logger.warning(f"actions字段不是数组格式: {action_info['actions']}")
return None
# 确保actions不为空
if len(action_info["actions"]) == 0:
logger.warning("actions数组为空")
return None
return action_info
except Exception as e:
logger.error(f"解析ReAct响应失败: {e}, 响应内容: {response[:200]}...")
return None
async def _retrieve_concepts_with_jargon(
concepts: List[str],
chat_id: str
) -> str:
"""对概念列表进行jargon检索
Args:
concepts: 概念列表
chat_id: 聊天ID
Returns:
str: 检索结果字符串
"""
if not concepts:
return ""
from src.jargon.jargon_miner import search_jargon
results = []
for concept in concepts:
concept = concept.strip()
if not concept:
continue
# 先尝试精确匹配
jargon_results = search_jargon(
keyword=concept,
chat_id=chat_id,
limit=10,
case_sensitive=False,
fuzzy=False
)
is_fuzzy_match = False
# 如果精确匹配未找到,尝试模糊搜索
if not jargon_results:
jargon_results = search_jargon(
keyword=concept,
chat_id=chat_id,
limit=10,
case_sensitive=False,
fuzzy=True
)
is_fuzzy_match = True
if jargon_results:
# 找到结果
if is_fuzzy_match:
# 模糊匹配
output_parts = [f"未精确匹配到'{concept}'"]
for result in jargon_results:
found_content = result.get("content", "").strip()
meaning = result.get("meaning", "").strip()
if found_content and meaning:
output_parts.append(f"找到 '{found_content}' 的含义为:{meaning}")
results.append("".join(output_parts))
logger.info(f"在jargon库中找到匹配模糊搜索: {concept},找到{len(jargon_results)}条结果")
else:
# 精确匹配
output_parts = []
for result in jargon_results:
meaning = result.get("meaning", "").strip()
if meaning:
output_parts.append(f"'{concept}' 为黑话或者网络简写,含义为:{meaning}")
results.append("".join(output_parts) if len(output_parts) > 1 else output_parts[0])
logger.info(f"在jargon库中找到匹配精确匹配: {concept},找到{len(jargon_results)}条结果")
else:
# 未找到
results.append(f"未在jargon库中找到'{concept}'的解释")
logger.info(f"在jargon库中未找到匹配: {concept}")
if results:
return "【概念检索结果】\n" + "\n".join(results) + "\n"
return ""
async def _react_agent_solve_question(
question: str,
chat_id: str,
max_iterations: int = 5,
timeout: float = 30.0,
initial_info: str = ""
) -> Tuple[bool, str, List[Dict[str, Any]], bool]:
"""使用ReAct架构的Agent来解决问题
Args:
question: 要回答的问题
chat_id: 聊天ID
max_iterations: 最大迭代次数
timeout: 超时时间(秒)
initial_info: 初始信息如概念检索结果将作为collected_info的初始值
Returns:
Tuple[bool, str, List[Dict[str, Any]], bool]: (是否找到答案, 答案内容, 思考步骤列表, 是否超时)
"""
start_time = time.time()
collected_info = initial_info if initial_info else ""
thinking_steps = []
is_timeout = False
conversation_messages: List[Message] = []
for iteration in range(max_iterations):
# 检查超时
if time.time() - start_time > timeout:
logger.warning(f"ReAct Agent超时已迭代{iteration}")
is_timeout = True
break
# 获取工具注册器
tool_registry = get_tool_registry()
# 获取bot_name
bot_name = global_config.bot.nickname
# 获取当前时间
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 计算剩余迭代次数
current_iteration = iteration + 1
remaining_iterations = max_iterations - current_iteration
is_final_iteration = current_iteration >= max_iterations
# 构建prompt不再需要工具文本描述
prompt_type = "memory_retrieval_react_prompt"
if is_final_iteration:
prompt_type = "memory_retrieval_react_final_prompt"
tool_definitions = []
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: 0最后一次迭代不提供工具调用")
else:
tool_definitions = tool_registry.get_tool_definitions()
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}")
prompt = await global_prompt_manager.format_prompt(
prompt_type,
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
)
if not is_final_iteration:
head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
)
def message_factory(
_client,
*,
_head_prompt: str = head_prompt,
_prompt: str = prompt,
_conversation_messages: List[Message] = conversation_messages,
) -> List[Message]:
messages: List[Message] = []
system_builder = MessageBuilder()
system_builder.set_role(RoleType.System)
system_builder.add_text_content(_head_prompt)
if _prompt.strip():
system_builder.add_text_content(f"\n{_prompt}")
messages.append(system_builder.build())
messages.extend(_conversation_messages)
for msg in messages:
print(msg)
return messages
success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools_by_message_factory(
message_factory,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
else:
logger.info(f"ReAct Agent 第 {iteration + 1} 次Prompt: {prompt}")
success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools(
prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}")
if not success:
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
assistant_message: Optional[Message] = None
if tool_calls:
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
if response and response.strip():
assistant_builder.add_text_content(response)
assistant_builder.set_tool_calls(tool_calls)
assistant_message = assistant_builder.build()
elif response and response.strip():
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
assistant_builder.add_text_content(response)
assistant_message = assistant_builder.build()
# 记录思考步骤
step = {
"iteration": iteration + 1,
"thought": response,
"actions": [],
"observations": []
}
# 优先从思考内容中提取final_answer或no_answer
def extract_quoted_content(text, func_name, param_name):
"""从文本中提取函数调用中参数的值,支持单引号和双引号
Args:
text: 要搜索的文本
func_name: 函数名,如 'final_answer'
param_name: 参数名,如 'answer'
Returns:
提取的参数值如果未找到则返回None
"""
if not text:
return None
# 查找函数调用位置(不区分大小写)
func_pattern = func_name.lower()
text_lower = text.lower()
func_pos = text_lower.find(func_pattern)
if func_pos == -1:
return None
# 查找参数名和等号
param_pattern = f'{param_name}='
param_pos = text_lower.find(param_pattern, func_pos)
if param_pos == -1:
return None
# 跳过参数名、等号和空白
start_pos = param_pos + len(param_pattern)
while start_pos < len(text) and text[start_pos] in ' \t\n':
start_pos += 1
if start_pos >= len(text):
return None
# 确定引号类型
quote_char = text[start_pos]
if quote_char not in ['"', "'"]:
return None
# 查找匹配的结束引号(考虑转义)
end_pos = start_pos + 1
while end_pos < len(text):
if text[end_pos] == quote_char:
# 检查是否是转义的引号
if end_pos > start_pos + 1 and text[end_pos - 1] == '\\':
end_pos += 1
continue
# 找到匹配的引号
content = text[start_pos + 1:end_pos]
# 处理转义字符
content = content.replace('\\"', '"').replace("\\'", "'").replace('\\\\', '\\')
return content
end_pos += 1
return None
# 从LLM的直接输出内容中提取final_answer或no_answer
final_answer_content = None
no_answer_reason = None
# 只检查responseLLM的直接输出内容不检查reasoning_content
if response:
final_answer_content = extract_quoted_content(response, 'final_answer', 'answer')
if not final_answer_content:
no_answer_reason = extract_quoted_content(response, 'no_answer', 'reason')
# 如果从输出内容中找到了答案,直接返回
if final_answer_content:
step["actions"].append({"action_type": "final_answer", "action_params": {"answer": final_answer_content}})
step["observations"] = ["从LLM输出内容中检测到final_answer"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到final_answer: {final_answer_content[:100]}...")
return True, final_answer_content, thinking_steps, False
if no_answer_reason:
step["actions"].append({"action_type": "no_answer", "action_params": {"reason": no_answer_reason}})
step["observations"] = ["从LLM输出内容中检测到no_answer"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到no_answer: {no_answer_reason[:100]}...")
return False, no_answer_reason, thinking_steps, False
if is_final_iteration:
step["actions"].append({"action_type": "no_answer", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}})
step["observations"] = ["已到达最后一次迭代,无法找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案")
return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False
if assistant_message:
conversation_messages.append(assistant_message)
# 记录思考过程到collected_info中
if reasoning_content or response:
thought_summary = reasoning_content or (response[:200] if response else "")
if thought_summary:
collected_info += f"\n[思考] {thought_summary}\n"
# 处理工具调用
if not tool_calls:
# 没有工具调用说明LLM在思考中已经给出了答案已在前面检查或者需要继续查询
# 如果思考中没有答案,说明需要继续查询或等待下一轮
if response and response.strip():
# 如果响应不为空,记录思考过程,继续下一轮迭代
step["observations"] = [f"思考完成,但未调用工具。响应: {response}"]
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response[:100]}...")
# 继续下一轮迭代让LLM有机会在思考中给出final_answer或继续查询
collected_info += f"思考: {response}"
thinking_steps.append(step)
continue
else:
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 无工具调用且无响应")
step["observations"] = ["无响应且无工具调用"]
thinking_steps.append(step)
break
# 处理工具调用
tool_tasks = []
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call.func_name
tool_args = tool_call.args or {}
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i+1}/{len(tool_calls)}: {tool_name}({tool_args})")
# 普通工具调用
tool = tool_registry.get_tool(tool_name)
if tool:
# 准备工具参数需要添加chat_id如果工具需要
tool_params = tool_args.copy()
# 如果工具函数签名需要chat_id添加它
import inspect
sig = inspect.signature(tool.execute_func)
if "chat_id" in sig.parameters:
tool_params["chat_id"] = chat_id
# 创建异步任务
async def execute_single_tool(tool_instance, params, tool_name_str, iter_num):
try:
observation = await tool_instance.execute(**params)
param_str = ", ".join([f"{k}={v}" for k, v in params.items() if k != "chat_id"])
return f"查询{tool_name_str}({param_str})的结果:{observation}"
except Exception as e:
error_msg = f"工具执行失败: {str(e)}"
logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 工具 {tool_name_str} {error_msg}")
return f"查询{tool_name_str}失败: {error_msg}"
tool_tasks.append(execute_single_tool(tool, tool_params, tool_name, iteration))
step["actions"].append({"action_type": tool_name, "action_params": tool_args})
else:
error_msg = f"未知的工具类型: {tool_name}"
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1}/{len(tool_calls)} {error_msg}")
tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{tool_name}失败: {error_msg}")))
# 并行执行所有工具
if tool_tasks:
observations = await asyncio.gather(*tool_tasks, return_exceptions=True)
# 处理执行结果
for i, (tool_call_item, observation) in enumerate(zip(tool_calls, observations, strict=False)):
if isinstance(observation, Exception):
observation = f"工具执行异常: {str(observation)}"
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行异常: {observation}")
observation_text = observation if isinstance(observation, str) else str(observation)
step["observations"].append(observation_text)
collected_info += f"\n{observation_text}\n"
if observation_text.strip():
tool_builder = MessageBuilder()
tool_builder.set_role(RoleType.Tool)
tool_builder.add_text_content(observation_text)
tool_builder.add_tool_call(tool_call_item.call_id)
conversation_messages.append(tool_builder.build())
# logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行结果: {observation_text}")
thinking_steps.append(step)
# 达到最大迭代次数或超时但Agent没有明确返回final_answer
# 迭代超时应该直接视为no_answer而不是使用已有信息
# 只有Agent明确返回final_answer时才认为找到了答案
if collected_info:
logger.warning(f"ReAct Agent达到最大迭代次数或超时但未明确返回final_answer。已收集信息: {collected_info[:100]}...")
if is_timeout:
logger.warning("ReAct Agent超时直接视为no_answer")
else:
logger.warning("ReAct Agent达到最大迭代次数直接视为no_answer")
return False, "未找到相关信息", thinking_steps, is_timeout
def _get_recent_query_history(chat_id: str, time_window_seconds: float = 300.0) -> str:
"""获取最近一段时间内的查询历史
Args:
chat_id: 聊天ID
time_window_seconds: 时间窗口默认10分钟
Returns:
str: 格式化的查询历史字符串
"""
try:
current_time = time.time()
start_time = current_time - time_window_seconds
# 查询最近时间窗口内的记录,按更新时间倒序
records = (
ThinkingBack.select()
.where(
(ThinkingBack.chat_id == chat_id) &
(ThinkingBack.update_time >= start_time)
)
.order_by(ThinkingBack.update_time.desc())
.limit(5) # 最多返回5条最近的记录
)
if not records.exists():
return ""
history_lines = []
history_lines.append("最近已查询的问题和结果:")
for record in records:
status = "✓ 已找到答案" if record.found_answer else "✗ 未找到答案"
answer_preview = ""
# 只有找到答案时才显示答案内容
if record.found_answer and record.answer:
# 截取答案前100字符
answer_preview = record.answer[:100]
if len(record.answer) > 100:
answer_preview += "..."
history_lines.append(f"- 问题:{record.question}")
history_lines.append(f" 状态:{status}")
if answer_preview:
history_lines.append(f" 答案:{answer_preview}")
history_lines.append("") # 空行分隔
return "\n".join(history_lines)
except Exception as e:
logger.error(f"获取查询历史失败: {e}")
return ""
def _get_cached_memories(chat_id: str, time_window_seconds: float = 300.0) -> List[str]:
"""获取最近一段时间内缓存的记忆(只返回找到答案的记录)
Args:
chat_id: 聊天ID
time_window_seconds: 时间窗口默认300秒5分钟
Returns:
List[str]: 格式化的记忆列表,每个元素格式为 "问题xxx\n答案xxx"
"""
try:
current_time = time.time()
start_time = current_time - time_window_seconds
# 查询最近时间窗口内找到答案的记录,按更新时间倒序
records = (
ThinkingBack.select()
.where(
(ThinkingBack.chat_id == chat_id) &
(ThinkingBack.update_time >= start_time) &
(ThinkingBack.found_answer == 1)
)
.order_by(ThinkingBack.update_time.desc())
.limit(5) # 最多返回5条最近的记录
)
if not records.exists():
return []
cached_memories = []
for record in records:
if record.answer:
cached_memories.append(f"问题:{record.question}\n答案:{record.answer}")
return cached_memories
except Exception as e:
logger.error(f"获取缓存记忆失败: {e}")
return []
def _query_thinking_back(chat_id: str, question: str) -> Optional[Tuple[bool, str]]:
"""从thinking_back数据库中查询是否有现成的答案
Args:
chat_id: 聊天ID
question: 问题
Returns:
Optional[Tuple[bool, str]]: 如果找到记录,返回(found_answer, answer)否则返回None
found_answer: 是否找到答案True表示found_answer=1False表示found_answer=0
answer: 答案内容
"""
try:
# 查询相同chat_id和问题的所有记录包括found_answer为0和1的
# 按更新时间倒序,获取最新的记录
records = (
ThinkingBack.select()
.where(
(ThinkingBack.chat_id == chat_id) &
(ThinkingBack.question == question)
)
.order_by(ThinkingBack.update_time.desc())
.limit(1)
)
if records.exists():
record = records.get()
found_answer = bool(record.found_answer)
answer = record.answer or ""
logger.info(f"在thinking_back中找到记录问题: {question[:50]}...found_answer: {found_answer}")
return found_answer, answer
return None
except Exception as e:
logger.error(f"查询thinking_back失败: {e}")
return None
def _store_thinking_back(
chat_id: str,
question: str,
context: str,
found_answer: bool,
answer: str,
thinking_steps: List[Dict[str, Any]]
) -> None:
"""存储或更新思考过程到数据库(如果已存在则更新,否则创建)
Args:
chat_id: 聊天ID
question: 问题
context: 上下文信息
found_answer: 是否找到答案
answer: 答案内容
thinking_steps: 思考步骤列表
"""
try:
now = time.time()
# 先查询是否已存在相同chat_id和问题的记录
existing = (
ThinkingBack.select()
.where(
(ThinkingBack.chat_id == chat_id) &
(ThinkingBack.question == question)
)
.order_by(ThinkingBack.update_time.desc())
.limit(1)
)
if existing.exists():
# 更新现有记录
record = existing.get()
record.context = context
record.found_answer = found_answer
record.answer = answer
record.thinking_steps = json.dumps(thinking_steps, ensure_ascii=False)
record.update_time = now
record.save()
logger.info(f"已更新思考过程到数据库,问题: {question[:50]}...")
else:
# 创建新记录
ThinkingBack.create(
chat_id=chat_id,
question=question,
context=context,
found_answer=found_answer,
answer=answer,
thinking_steps=json.dumps(thinking_steps, ensure_ascii=False),
create_time=now,
update_time=now
)
logger.info(f"已创建思考过程到数据库,问题: {question[:50]}...")
except Exception as e:
logger.error(f"存储思考过程失败: {e}")
async def _process_single_question(
question: str,
chat_id: str,
context: str,
initial_info: str = ""
) -> Optional[str]:
"""处理单个问题的查询(包含缓存检查逻辑)
Args:
question: 要查询的问题
chat_id: 聊天ID
context: 上下文信息
initial_info: 初始信息如概念检索结果将传递给ReAct Agent
Returns:
Optional[str]: 如果找到答案返回格式化的结果字符串否则返回None
"""
logger.info(f"开始处理问题: {question}")
# 先检查thinking_back数据库中是否有现成答案
cached_result = _query_thinking_back(chat_id, question)
should_requery = False
if cached_result:
cached_found_answer, cached_answer = cached_result
# 根据found_answer的值决定是否重新查询
if cached_found_answer: # found_answer == 1 (True)
# found_answer == 120%概率重新查询
if random.random() < 0.2:
should_requery = True
logger.info(f"found_answer=1触发20%概率重新查询,问题: {question[:50]}...")
else: # found_answer == 0 (False)
# found_answer == 040%概率重新查询
if random.random() < 0.4:
should_requery = True
logger.info(f"found_answer=0触发40%概率重新查询,问题: {question[:50]}...")
# 如果不需要重新查询,使用缓存答案
if not should_requery:
if cached_answer:
logger.info(f"从thinking_back缓存中获取答案问题: {question[:50]}...")
return f"问题:{question}\n答案:{cached_answer}"
else:
# 缓存中没有答案,需要查询
should_requery = True
# 如果没有缓存答案或需要重新查询使用ReAct Agent查询
if not cached_result or should_requery:
if should_requery:
logger.info(f"概率触发重新查询使用ReAct Agent查询问题: {question[:50]}...")
else:
logger.info(f"未找到缓存答案使用ReAct Agent查询问题: {question[:50]}...")
found_answer, answer, thinking_steps, is_timeout = await _react_agent_solve_question(
question=question,
chat_id=chat_id,
max_iterations=5,
timeout=120.0,
initial_info=initial_info
)
# 存储到数据库(超时时不存储)
if not is_timeout:
_store_thinking_back(
chat_id=chat_id,
question=question,
context=context,
found_answer=found_answer,
answer=answer,
thinking_steps=thinking_steps
)
else:
logger.info(f"ReAct Agent超时不存储到数据库问题: {question[:50]}...")
if found_answer and answer:
return f"问题:{question}\n答案:{answer}"
return None
async def build_memory_retrieval_prompt(
message: str,
sender: str,
target: str,
chat_stream,
tool_executor,
) -> str:
"""构建记忆检索提示
使用两段式查询第一步生成问题第二步使用ReAct Agent查询答案
Args:
message: 聊天历史记录
sender: 发送者名称
target: 目标消息内容
chat_stream: 聊天流对象
tool_executor: 工具执行器(保留参数以兼容接口)
Returns:
str: 记忆检索结果字符串
"""
start_time = time.time()
logger.info(f"检测是否需要回忆,元消息:{message[:30]}...,消息长度: {len(message)}")
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
bot_name = global_config.bot.nickname
chat_id = chat_stream.stream_id
# 获取最近查询历史最近1小时内的查询
recent_query_history = _get_recent_query_history(chat_id, time_window_seconds=300.0)
if not recent_query_history:
recent_query_history = "最近没有查询记录。"
# 第一步:生成问题
question_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_question_prompt",
bot_name=bot_name,
time_now=time_now,
chat_history=message,
recent_query_history=recent_query_history,
sender=sender,
target_message=target,
)
success, response, reasoning_content, model_name = await llm_api.generate_with_model(
question_prompt,
model_config=model_config.model_task_config.tool_use,
request_type="memory.question",
)
logger.info(f"记忆检索问题生成提示词: {question_prompt}")
logger.info(f"记忆检索问题生成响应: {response}")
if not success:
logger.error(f"LLM生成问题失败: {response}")
return ""
# 解析概念列表和问题列表
concepts, questions = _parse_questions_json(response)
logger.info(f"解析到 {len(concepts)} 个概念: {concepts}")
logger.info(f"解析到 {len(questions)} 个问题: {questions}")
# 对概念进行jargon检索作为初始信息
initial_info = ""
if concepts:
logger.info(f"开始对 {len(concepts)} 个概念进行jargon检索")
initial_info = await _retrieve_concepts_with_jargon(concepts, chat_id)
if initial_info:
logger.info(f"概念检索完成,结果: {initial_info[:200]}...")
else:
logger.info("概念检索未找到任何结果")
# 获取缓存的记忆与question时使用相同的时间窗口和数量限制
cached_memories = _get_cached_memories(chat_id, time_window_seconds=300.0)
if not questions:
logger.debug("模型认为不需要检索记忆或解析失败")
# 即使没有当次查询,也返回缓存的记忆和概念检索结果
all_results = []
if initial_info:
all_results.append(initial_info.strip())
if cached_memories:
all_results.extend(cached_memories)
if all_results:
retrieved_memory = "\n\n".join(all_results)
end_time = time.time()
logger.info(f"无当次查询,返回缓存记忆和概念检索结果,耗时: {(end_time - start_time):.3f}")
return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n"
else:
return ""
logger.info(f"解析到 {len(questions)} 个问题: {questions}")
# 第二步并行处理所有问题固定使用5次迭代/120秒超时
logger.info(f"问题数量: {len(questions)},固定设置最大迭代次数: 5超时时间: 120秒")
# 并行处理所有问题,将概念检索结果作为初始信息传递
question_tasks = [
_process_single_question(
question=question,
chat_id=chat_id,
context=message,
initial_info=initial_info
)
for question in questions
]
# 并行执行所有查询任务
results = await asyncio.gather(*question_tasks, return_exceptions=True)
# 收集所有有效结果
all_results = []
current_questions = set() # 用于去重,避免缓存和当次查询重复
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"处理问题 '{questions[i]}' 时发生异常: {result}")
elif result is not None:
all_results.append(result)
# 提取问题用于去重
if result.startswith("问题:"):
question = result.split("\n")[0].replace("问题:", "").strip()
current_questions.add(question)
# 将缓存的记忆添加到结果中(排除当次查询已包含的问题,避免重复)
for cached_memory in cached_memories:
if cached_memory.startswith("问题:"):
question = cached_memory.split("\n")[0].replace("问题:", "").strip()
# 只有当次查询中没有相同问题时,才添加缓存记忆
if question not in current_questions:
all_results.append(cached_memory)
logger.debug(f"添加缓存记忆: {question[:50]}...")
end_time = time.time()
if all_results:
retrieved_memory = "\n\n".join(all_results)
logger.info(f"记忆检索成功,耗时: {(end_time - start_time):.3f}秒,包含 {len(all_results)} 条记忆(含缓存)")
return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n"
else:
logger.debug("所有问题均未找到答案,且无缓存记忆")
return ""
except Exception as e:
logger.error(f"记忆检索时发生异常: {str(e)}")
return ""
def _parse_questions_json(response: str) -> Tuple[List[str], List[str]]:
"""解析问题JSON返回概念列表和问题列表
Args:
response: LLM返回的响应
Returns:
Tuple[List[str], List[str]]: (概念列表, 问题列表)
"""
try:
# 尝试提取JSON可能包含在```json代码块中
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
if matches:
json_str = matches[0]
else:
# 尝试直接解析整个响应
json_str = response.strip()
# 修复可能的JSON错误
repaired_json = repair_json(json_str)
# 解析JSON
parsed = json.loads(repaired_json)
# 只支持新格式包含concepts和questions的对象
if not isinstance(parsed, dict):
logger.warning(f"解析的JSON不是对象格式: {parsed}")
return [], []
concepts_raw = parsed.get("concepts", [])
questions_raw = parsed.get("questions", [])
# 确保是列表
if not isinstance(concepts_raw, list):
concepts_raw = []
if not isinstance(questions_raw, list):
questions_raw = []
# 确保所有元素都是字符串
concepts = [c for c in concepts_raw if isinstance(c, str) and c.strip()]
questions = [q for q in questions_raw if isinstance(q, str) and q.strip()]
return concepts, questions
except Exception as e:
logger.error(f"解析问题JSON失败: {e}, 响应内容: {response[:200]}...")
return [], []