diff --git a/src/config/official_configs.py b/src/config/official_configs.py index c0e1950f..a6652e0e 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -282,12 +282,20 @@ class MemoryConfig(ConfigBase): - 当在黑名单中的聊天流进行查询时,仅使用该聊天流的本地记忆 """ - planner_question: bool = True - """ - 是否使用 Planner 提供的 question 作为记忆检索问题 - - True: 当 Planner 在 reply 动作中提供了 question 时,直接使用该问题进行记忆检索,跳过 LLM 生成问题的步骤 - - False: 沿用旧模式,使用 LLM 生成问题 - """ + chat_history_topic_check_message_threshold: int = 80 + """聊天历史话题检查的消息数量阈值,当累积消息数达到此值时触发话题检查""" + + chat_history_topic_check_time_hours: float = 8.0 + """聊天历史话题检查的时间阈值(小时),当距离上次检查超过此时间且消息数达到最小阈值时触发话题检查""" + + chat_history_topic_check_min_messages: int = 20 + """聊天历史话题检查的时间触发模式下的最小消息数阈值""" + + chat_history_finalize_no_update_checks: int = 3 + """聊天历史话题打包存储的连续无更新检查次数阈值,当话题连续N次检查无新增内容时触发打包存储""" + + chat_history_finalize_message_count: int = 5 + """聊天历史话题打包存储的消息条数阈值,当话题的消息条数超过此值时触发打包存储""" def __post_init__(self): """验证配置值""" @@ -295,6 +303,16 @@ class MemoryConfig(ConfigBase): raise ValueError(f"max_agent_iterations 必须至少为1,当前值: {self.max_agent_iterations}") if self.agent_timeout_seconds <= 0: raise ValueError(f"agent_timeout_seconds 必须大于0,当前值: {self.agent_timeout_seconds}") + if self.chat_history_topic_check_message_threshold < 1: + raise ValueError(f"chat_history_topic_check_message_threshold 必须至少为1,当前值: {self.chat_history_topic_check_message_threshold}") + if self.chat_history_topic_check_time_hours <= 0: + raise ValueError(f"chat_history_topic_check_time_hours 必须大于0,当前值: {self.chat_history_topic_check_time_hours}") + if self.chat_history_topic_check_min_messages < 1: + raise ValueError(f"chat_history_topic_check_min_messages 必须至少为1,当前值: {self.chat_history_topic_check_min_messages}") + if self.chat_history_finalize_no_update_checks < 1: + raise ValueError(f"chat_history_finalize_no_update_checks 必须至少为1,当前值: {self.chat_history_finalize_no_update_checks}") + if self.chat_history_finalize_message_count < 1: + raise ValueError(f"chat_history_finalize_message_count 必须至少为1,当前值: {self.chat_history_finalize_message_count}") @dataclass @@ -732,6 +750,9 @@ class ExperimentalConfig(ConfigBase): - prompt内容: 要添加的额外prompt文本 """ + lpmm_memory: bool = False + """是否将聊天历史总结导入到LPMM知识库。开启后,chat_history_summarizer总结出的历史记录会同时导入到知识库""" + @dataclass class MaimMessageConfig(ConfigBase): diff --git a/src/memory_system/chat_history_summarizer.py b/src/memory_system/chat_history_summarizer.py index d290337f..8c2d0980 100644 --- a/src/memory_system/chat_history_summarizer.py +++ b/src/memory_system/chat_history_summarizer.py @@ -15,7 +15,7 @@ from json_repair import repair_json from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages -from src.config.config import model_config +from src.config.config import model_config, global_config from src.llm_models.utils_model import LLMRequest from src.plugin_system.apis import message_api from src.chat.utils.chat_message_builder import build_readable_messages @@ -370,18 +370,24 @@ class ChatHistorySummarizer: logger.debug(f"{self.log_prefix} 批次状态检查 | 消息数: {message_count} | 距上次检查: {time_str}") - # 检查“话题检查”触发条件 + # 检查"话题检查"触发条件 should_check = False - # 条件1: 消息数量 >= 100,触发一次检查 - if message_count >= 80: - should_check = True - logger.info(f"{self.log_prefix} 触发检查条件: 消息数量达到 {message_count} 条(阈值: 100条)") + # 从配置中获取阈值 + message_threshold = global_config.memory.chat_history_topic_check_message_threshold + time_threshold_hours = global_config.memory.chat_history_topic_check_time_hours + min_messages = global_config.memory.chat_history_topic_check_min_messages + time_threshold_seconds = time_threshold_hours * 3600 - # 条件2: 距离上一次检查 > 3600 * 8 秒(8小时)且消息数量 >= 20 条,触发一次检查 - elif time_since_last_check > 3600 * 8 and message_count >= 20: + # 条件1: 消息数量达到阈值,触发一次检查 + if message_count >= message_threshold: should_check = True - logger.info(f"{self.log_prefix} 触发检查条件: 距上次检查 {time_str}(阈值: 8小时)且消息数量达到 {message_count} 条(阈值: 20条)") + logger.info(f"{self.log_prefix} 触发检查条件: 消息数量达到 {message_count} 条(阈值: {message_threshold}条)") + + # 条件2: 距离上一次检查超过时间阈值且消息数量达到最小阈值,触发一次检查 + elif time_since_last_check > time_threshold_seconds and message_count >= min_messages: + should_check = True + logger.info(f"{self.log_prefix} 触发检查条件: 距上次检查 {time_str}(阈值: {time_threshold_hours}小时)且消息数量达到 {message_count} 条(阈值: {min_messages}条)") if should_check: await self._run_topic_check_and_update_cache(messages) @@ -528,14 +534,18 @@ class ChatHistorySummarizer: item.no_update_checks += 1 # 6. 检查是否有话题需要打包存储 + # 从配置中获取阈值 + no_update_checks_threshold = global_config.memory.chat_history_finalize_no_update_checks + message_count_threshold = global_config.memory.chat_history_finalize_message_count + topics_to_finalize: List[str] = [] for topic, item in self.topic_cache.items(): - if item.no_update_checks >= 3: - logger.info(f"{self.log_prefix} 话题[{topic}] 连续 3 次检查无新增内容,触发打包存储") + if item.no_update_checks >= no_update_checks_threshold: + logger.info(f"{self.log_prefix} 话题[{topic}] 连续 {no_update_checks_threshold} 次检查无新增内容,触发打包存储") topics_to_finalize.append(topic) continue - if len(item.messages) > 5: - logger.info(f"{self.log_prefix} 话题[{topic}] 消息条数超过 4,触发打包存储") + if len(item.messages) > message_count_threshold: + logger.info(f"{self.log_prefix} 话题[{topic}] 消息条数超过 {message_count_threshold},触发打包存储") topics_to_finalize.append(topic) for topic in topics_to_finalize: @@ -976,6 +986,16 @@ class ChatHistorySummarizer: else: logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败") + # 如果配置开启,同时导入到LPMM知识库 + if global_config.lpmm_knowledge.enable and global_config.experimental.lpmm_memory: + await self._import_to_lpmm_knowledge( + theme=theme, + summary=summary, + key_point=key_point, + participants=participants, + original_text=original_text, + ) + except Exception as e: logger.error(f"{self.log_prefix} 存储到数据库时出错: {e}") import traceback @@ -983,6 +1003,82 @@ class ChatHistorySummarizer: traceback.print_exc() raise + async def _import_to_lpmm_knowledge( + self, + theme: str, + summary: str, + key_point: Optional[List[str]], + participants: List[str], + original_text: str, + ): + """ + 将聊天历史总结导入到LPMM知识库 + + Args: + theme: 话题主题 + summary: 概括内容 + key_point: 关键信息点列表 + participants: 参与者列表 + original_text: 原始文本(可能很长,需要截断) + """ + try: + from src.chat.knowledge.lpmm_ops import lpmm_ops + + # 构造要导入的文本内容 + # 格式:主题 + 概括 + 关键信息点 + 参与者信息 + content_parts = [] + + # 1. 话题主题 + if theme: + content_parts.append(f"话题:{theme}") + + # 2. 概括内容 + if summary: + content_parts.append(f"概括:{summary}") + + # 3. 关键信息点 + if key_point: + key_points_text = "、".join(key_point) + content_parts.append(f"关键信息:{key_points_text}") + + # 4. 参与者信息 + if participants: + participants_text = "、".join(participants) + content_parts.append(f"参与者:{participants_text}") + + # 5. 原始文本摘要(如果原始文本太长,只取前500字) + if original_text: + # 截断原始文本,避免过长 + max_original_length = 500 + if len(original_text) > max_original_length: + truncated_text = original_text[:max_original_length] + "..." + content_parts.append(f"原始内容摘要:{truncated_text}") + else: + content_parts.append(f"原始内容:{original_text}") + + # 将所有部分合并为一个段落(用双换行分隔,符合lpmm_ops.add_content的格式要求) + content_to_import = "\n\n".join(content_parts) + + if not content_to_import.strip(): + logger.warning(f"{self.log_prefix} 聊天历史总结内容为空,跳过导入知识库") + return + + # 调用lpmm_ops导入 + result = await lpmm_ops.add_content(content_to_import) + + if result["status"] == "success": + logger.info( + f"{self.log_prefix} 成功将聊天历史总结导入到LPMM知识库 | 话题: {theme} | 新增段落数: {result.get('count', 0)}" + ) + else: + logger.warning( + f"{self.log_prefix} 将聊天历史总结导入到LPMM知识库失败 | 话题: {theme} | 错误: {result.get('message', '未知错误')}" + ) + + except Exception as e: + # 导入失败不应该影响数据库存储,只记录错误 + logger.error(f"{self.log_prefix} 导入聊天历史总结到LPMM知识库时出错: {e}", exc_info=True) + async def start(self): """启动后台定期检查循环""" if self._running: diff --git a/src/memory_system/memory_retrieval.py b/src/memory_system/memory_retrieval.py index 5473d53d..bb2ed981 100644 --- a/src/memory_system/memory_retrieval.py +++ b/src/memory_system/memory_retrieval.py @@ -8,7 +8,6 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.plugin_system.apis import llm_api from src.common.database.database_model import ThinkingBack from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools -from src.memory_system.memory_utils import parse_questions_json from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message from src.chat.message_receive.chat_stream import get_chat_manager from src.bw_learner.jargon_explainer import retrieve_concepts_with_jargon @@ -47,53 +46,36 @@ def init_memory_retrieval_prompt(): # 首先注册所有工具 init_all_tools() - # 第一步:问题生成prompt + # 第二步:ReAct Agent prompt(使用function calling,要求先思考再行动) Prompt( - """ -你的名字是{bot_name}。现在是{time_now}。 -群里正在进行的聊天内容: + """你的名字是{bot_name}。现在是{time_now}。 +你正在参与聊天,你需要搜集信息来帮助你进行回复。 +重要,这是当前聊天记录: {chat_history} +聊天记录结束 -{recent_query_history} +已收集的信息: +{collected_info} -现在,{sender}发送了内容:{target_message},你想要回复ta。 -请仔细分析聊天内容,考虑以下几点: -1. 对话中是否提到了过去发生的事情、人物、事件或信息 -2. 是否有需要回忆的内容(比如"之前说过"、"上次"、"以前"等) -3. 是否有需要查找历史信息的问题 -4. 是否有问题可以搜集信息帮助你聊天 +- 你可以对查询思路给出简短的思考:思考要简短,直接切入要点 +- 思考完毕后,使用工具 -重要提示: -- **每次只能提出一个问题**,选择最需要查询的关键问题 -- 如果"最近已查询的问题和结果"中已经包含了类似的问题并得到了答案,请避免重复生成相同或相似的问题,不需要重复查询 -- 如果之前已经查询过某个问题但未找到答案,可以尝试用不同的方式提问或更具体的问题 - -如果你认为需要从记忆中检索信息来回答,请根据上下文提出**一个**最关键的问题来帮助你回复目标消息,放入"questions"字段 - -问题格式示例: -- "xxx在前几天干了什么" -- "xxx是什么,在什么时候提到过?" -- "xxxx和xxx的关系是什么" -- "xxx在某个时间点发生了什么" - -问题要说明前因后果和上下文,使其全面且精准 - -输出格式示例: -```json -{{ - "questions": ["张三在前几天干了什么"] #问题数组(字符串数组),如果不需要检索记忆则输出空数组[],如果需要检索则只输出包含一个问题的数组 -}} -``` -请只输出JSON对象,不要输出其他内容: +**工具说明:** +- 如果涉及过往事件,或者查询某个过去可能提到过的概念,或者某段时间发生的事件。可以使用lpmm知识库查询 +- 如果遇到不熟悉的词语、缩写、黑话或网络用语,可以使用query_words工具查询其含义 +- 你必须使用tool,如果需要查询你必须给出使用什么工具进行查询 +- 当你决定结束查询时,必须调用return_information工具返回总结信息并结束查询 """, - name="memory_retrieval_question_prompt", + name="memory_retrieval_react_prompt_head_lpmm", ) # 第二步:ReAct Agent prompt(使用function calling,要求先思考再行动) Prompt( """你的名字是{bot_name}。现在是{time_now}。 -你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。 -当前需要解答的问题:{question} +你正在参与聊天,你需要搜集信息来帮助你进行回复。 +当前聊天记录: +{chat_history} + 已收集的信息: {collected_info} @@ -107,7 +89,7 @@ def init_memory_retrieval_prompt(): - 你可以对查询思路给出简短的思考:思考要简短,直接切入要点 - 先思考当前信息是否足够回答问题 - 如果信息不足,则需要使用tool查询信息,你必须给出使用什么工具进行查询 -- 如果当前已收集的信息足够或信息不足确定无法找到答案,你必须调用found_answer工具结束查询 +- 当你决定结束查询时,必须调用return_information工具返回总结信息并结束查询 """, name="memory_retrieval_react_prompt_head", ) @@ -115,23 +97,24 @@ def init_memory_retrieval_prompt(): # 额外,如果最后一轮迭代:ReAct Agent prompt(使用function calling,要求先思考再行动) Prompt( """你的名字是{bot_name}。现在是{time_now}。 -你正在参与聊天,你需要根据搜集到的信息判断问题是否可以回答问题。 +你正在参与聊天,你需要根据搜集到的信息总结信息。 +如果搜集到的信息对于参与聊天,回答问题有帮助,请加入总结,如果无关,请不要加入到总结。 + +当前聊天记录: +{chat_history} -当前问题:{question} 已收集的信息: {collected_info} + 分析: -- 当前信息是否足够回答问题? -- **如果信息足够且能找到明确答案**,在思考中直接给出答案,格式为:found_answer(answer="你的答案内容") -- **如果信息不足或无法找到答案**,在思考中给出:not_enough_info(reason="信息不足或无法找到答案的原因") +- 基于已收集的信息,总结出对当前聊天有帮助的相关信息 +- **如果收集的信息对当前聊天有帮助**,在思考中直接给出总结信息,格式为:return_information(information="你的总结信息") +- **如果信息无关或没有帮助**,在思考中给出:return_information(information="") **重要规则:** - 必须严格使用检索到的信息回答问题,不要编造信息 - 答案必须精简,不要过多解释 -- **只有在检索到明确、具体的答案时,才使用found_answer** -- **如果信息不足、无法确定、找不到相关信息,必须使用not_enough_info,不要使用found_answer** -- 答案必须给出,格式为 found_answer(answer="...") 或 not_enough_info(reason="...")。 """, name="memory_retrieval_react_final_prompt", ) @@ -198,20 +181,20 @@ def _log_conversation_messages( async def _react_agent_solve_question( - question: str, chat_id: str, max_iterations: int = 5, timeout: float = 30.0, initial_info: str = "", + chat_history: str = "", ) -> Tuple[bool, str, List[Dict[str, Any]], bool]: """使用ReAct架构的Agent来解决问题 Args: - question: 要回答的问题 chat_id: 聊天ID max_iterations: 最大迭代次数 timeout: 超时时间(秒) initial_info: 初始信息,将作为collected_info的初始值 + chat_history: 聊天记录,将传递给 ReAct Agent prompt Returns: Tuple[bool, str, List[Dict[str, Any]], bool]: (是否找到答案, 答案内容, 思考步骤列表, 是否超时) @@ -259,8 +242,8 @@ async def _react_agent_solve_question( Args: text: 要搜索的文本 - func_name: 函数名,如 'found_answer' - param_name: 参数名,如 'answer' + func_name: 函数名,如 'return_information' + param_name: 参数名,如 'information' Returns: 提取的参数值,如果未找到则返回None @@ -311,7 +294,7 @@ async def _react_agent_solve_question( return None - # 正常迭代:使用head_prompt决定调用哪些工具(包含found_answer工具) + # 正常迭代:使用head_prompt决定调用哪些工具(包含return_information工具) tool_definitions = tool_registry.get_tool_definitions() # tool_names = [tool_def["name"] for tool_def in tool_definitions] # logger.debug(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具: {', '.join(tool_names)} (共{len(tool_definitions)}个)") @@ -320,11 +303,13 @@ async def _react_agent_solve_question( if first_head_prompt is None: # 第一次构建,使用初始的collected_info(即initial_info) initial_collected_info = initial_info if initial_info else "" + # 根据配置选择使用哪个 prompt + prompt_name = "memory_retrieval_react_prompt_head_lpmm" if global_config.experimental.lpmm_memory else "memory_retrieval_react_prompt_head" first_head_prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_react_prompt_head", + prompt_name, bot_name=bot_name, time_now=time_now, - question=question, + chat_history=chat_history, collected_info=initial_collected_info, current_iteration=current_iteration, remaining_iterations=remaining_iterations, @@ -372,7 +357,7 @@ async def _react_agent_solve_question( logger.error(f"ReAct Agent LLM调用失败: {response}") break - # 注意:这里会检查found_answer工具调用,如果检测到found_answer工具,会根据answer参数决定返回答案或退出查询 + # 注意:这里会检查return_information工具调用,如果检测到return_information工具,会根据information参数决定返回信息或退出查询 assistant_message: Optional[Message] = None if tool_calls: @@ -402,11 +387,11 @@ async def _react_agent_solve_question( # 处理工具调用 if not tool_calls: - # 如果没有工具调用,检查响应文本中是否包含found_answer函数调用格式或JSON格式 + # 如果没有工具调用,检查响应文本中是否包含return_information函数调用格式或JSON格式 if response and response.strip(): - # 首先尝试解析JSON格式的found_answer - def parse_json_found_answer(text: str): - """从文本中解析JSON格式的found_answer,返回(found_answer, answer)元组,如果未找到则返回(None, None)""" + # 首先尝试解析JSON格式的return_information + def parse_json_return_information(text: str): + """从文本中解析JSON格式的return_information,返回information字符串,如果未找到则返回None""" if not text: return None, None @@ -429,11 +414,10 @@ async def _react_agent_solve_question( # 尝试解析JSON data = json.loads(json_text) - # 检查是否包含found_answer字段 - if isinstance(data, dict) and "found_answer" in data: - found_answer = bool(data.get("found_answer", False)) - answer = data.get("answer", "") - return found_answer, answer + # 检查是否包含return_information字段 + if isinstance(data, dict) and "return_information" in data: + information = data.get("information", "") + return information except (json.JSONDecodeError, ValueError, TypeError): # 如果JSON解析失败,尝试在文本中查找JSON对象 try: @@ -455,93 +439,85 @@ async def _react_agent_solve_question( if json_end != -1: json_text = text[first_brace:json_end] data = json.loads(json_text) - if isinstance(data, dict) and "found_answer" in data: - found_answer = bool(data.get("found_answer", False)) - answer = data.get("answer", "") - return found_answer, answer + if isinstance(data, dict) and "return_information" in data: + information = data.get("information", "") + return information except (json.JSONDecodeError, ValueError, TypeError): pass - return None, None + return None - # 尝试从文本中解析found_answer函数调用 - def parse_found_answer_from_text(text: str): - """从文本中解析found_answer函数调用,返回answer字符串,如果未找到则返回None - 如果answer存在且非空,表示找到答案;如果answer为空或不存在,表示未找到答案""" + # 尝试从文本中解析return_information函数调用 + def parse_return_information_from_text(text: str): + """从文本中解析return_information函数调用,返回information字符串,如果未找到则返回None""" if not text: return None - # 查找found_answer函数调用位置(不区分大小写) - func_pattern = "found_answer" + # 查找return_information函数调用位置(不区分大小写) + func_pattern = "return_information" text_lower = text.lower() func_pos = text_lower.find(func_pattern) if func_pos == -1: return None - # 解析answer参数(字符串,使用extract_quoted_content) - answer = extract_quoted_content(text, "found_answer", "answer") + # 解析information参数(字符串,使用extract_quoted_content) + information = extract_quoted_content(text, "return_information", "information") - # 如果answer存在(即使是空字符串),也返回它(空字符串表示未找到答案) - return answer + # 如果information存在(即使是空字符串),也返回它 + return information # 首先尝试解析JSON格式 - parsed_found_answer_json, parsed_answer_json = parse_json_found_answer(response) - is_json_format = parsed_found_answer_json is not None + parsed_information_json = parse_json_return_information(response) + is_json_format = parsed_information_json is not None # 如果JSON解析成功,使用JSON结果 if is_json_format: - parsed_answer = parsed_answer_json - has_answer = parsed_found_answer_json and parsed_answer and parsed_answer.strip() + parsed_information = parsed_information_json else: # 如果JSON解析失败,尝试解析函数调用格式 - parsed_answer = parse_found_answer_from_text(response) - # 如果answer存在且非空,表示找到答案;否则表示未找到答案 - has_answer = parsed_answer is not None and parsed_answer.strip() != "" + parsed_information = parse_return_information_from_text(response) - if parsed_answer is not None or is_json_format: - # 检测到found_answer格式(可能是JSON格式或函数调用格式) + if parsed_information is not None or is_json_format: + # 检测到return_information格式(可能是JSON格式或函数调用格式) format_type = "JSON格式" if is_json_format else "函数调用格式" - if has_answer: - # 找到了答案 - step["actions"].append( - { - "action_type": "found_answer", - "action_params": {"answer": parsed_answer}, - } - ) - step["observations"] = [f"检测到found_answer{format_type}调用,找到答案"] + # 返回信息(即使为空字符串也返回) + step["actions"].append( + { + "action_type": "return_information", + "action_params": {"information": parsed_information or ""}, + } + ) + if parsed_information and parsed_information.strip(): + step["observations"] = [f"检测到return_information{format_type}调用,返回信息"] thinking_steps.append(step) logger.info( - f"{react_log_prefix}第 {iteration + 1} 次迭代 通过found_answer{format_type}找到关于问题{question}的答案: {parsed_answer[:100]}..." + f"{react_log_prefix}第 {iteration + 1} 次迭代 通过return_information{format_type}返回信息: {parsed_information[:100]}..." ) _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status=f"找到答案:{parsed_answer}", + final_status=f"返回信息:{parsed_information}", ) - return True, parsed_answer, thinking_steps, False + return True, parsed_information, thinking_steps, False else: - # 未找到答案,直接退出查询 - step["actions"].append( - {"action_type": "found_answer", "action_params": {"answer": ""}} - ) - step["observations"] = [f"检测到found_answer{format_type}调用,未找到答案"] + # 信息为空,直接退出查询 + step["observations"] = [f"检测到return_information{format_type}调用,信息为空"] thinking_steps.append(step) logger.info( - f"{react_log_prefix}第 {iteration + 1} 次迭代 通过found_answer{format_type}判断未找到答案" + f"{react_log_prefix}第 {iteration + 1} 次迭代 通过return_information{format_type}判断信息为空" ) _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status="未找到答案:通过found_answer文本格式判断未找到答案", + final_status="信息为空:通过return_information文本格式判断信息为空", ) return False, "", thinking_steps, False - # 如果没有检测到found_answer格式,记录思考过程,继续下一轮迭代 + # 如果没有检测到return_information格式,记录思考过程,继续下一轮迭代 step["observations"] = [f"思考完成,但未调用工具。响应: {response}"] logger.info( f"{react_log_prefix}第 {iteration + 1} 次迭代 思考完成但未调用工具: {response}" @@ -555,55 +531,54 @@ async def _react_agent_solve_question( continue # 处理工具调用 - # 首先检查是否有found_answer工具调用,如果有则立即返回,不再处理其他工具 - found_answer_answer = None + # 首先检查是否有return_information工具调用,如果有则立即返回,不再处理其他工具 + return_information_info = None for tool_call in tool_calls: tool_name = tool_call.func_name tool_args = tool_call.args or {} - if tool_name == "found_answer": - found_answer_answer = tool_args.get("answer", "") + if tool_name == "return_information": + return_information_info = tool_args.get("information", "") - # 如果answer存在且非空,表示找到答案;否则表示未找到答案 - if found_answer_answer and found_answer_answer.strip(): - # 找到了答案 - step["actions"].append( - { - "action_type": "found_answer", - "action_params": {"answer": found_answer_answer}, - } - ) - step["observations"] = ["检测到found_answer工具调用,找到答案"] + # 返回信息(即使为空也返回) + step["actions"].append( + { + "action_type": "return_information", + "action_params": {"information": return_information_info}, + } + ) + if return_information_info and return_information_info.strip(): + # 有信息,返回 + step["observations"] = ["检测到return_information工具调用,返回信息"] thinking_steps.append(step) logger.info( - f"{react_log_prefix}第 {iteration + 1} 次迭代 通过found_answer工具找到关于问题{question}的答案: {found_answer_answer}" + f"{react_log_prefix}第 {iteration + 1} 次迭代 通过return_information工具返回信息: {return_information_info}" ) _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status=f"找到答案:{found_answer_answer}", + final_status=f"返回信息:{return_information_info}", ) - return True, found_answer_answer, thinking_steps, False + return True, return_information_info, thinking_steps, False else: - # 未找到答案,直接退出查询 - step["actions"].append({"action_type": "found_answer", "action_params": {"answer": ""}}) - step["observations"] = ["检测到found_answer工具调用,未找到答案"] + # 信息为空,直接退出查询 + step["observations"] = ["检测到return_information工具调用,信息为空"] thinking_steps.append(step) logger.info( - f"{react_log_prefix}第 {iteration + 1} 次迭代 通过found_answer工具判断未找到答案" + f"{react_log_prefix}第 {iteration + 1} 次迭代 通过return_information工具判断信息为空" ) _log_conversation_messages( conversation_messages, head_prompt=first_head_prompt, - final_status="未找到答案:通过found_answer工具判断未找到答案", + final_status="信息为空:通过return_information工具判断信息为空", ) return False, "", thinking_steps, False - # 如果没有found_answer工具调用,继续处理其他工具 + # 如果没有return_information工具调用,继续处理其他工具 tool_tasks = [] for i, tool_call in enumerate(tool_calls): tool_name = tool_call.func_name @@ -613,8 +588,8 @@ async def _react_agent_solve_question( f"{react_log_prefix}第 {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})" ) - # 跳过found_answer工具调用(已经在上面处理过了) - if tool_name == "found_answer": + # 跳过return_information工具调用(已经在上面处理过了) + if tool_name == "return_information": continue # 记录最后一次使用的工具名称(用于判断是否需要额外迭代) @@ -712,8 +687,8 @@ async def _react_agent_solve_question( Args: text: 要搜索的文本 - func_name: 函数名,如 'found_answer' - param_name: 参数名,如 'answer' + func_name: 函数名,如 'return_information' + param_name: 参数名,如 'information' Returns: 提取的参数值,如果未找到则返回None @@ -769,7 +744,7 @@ async def _react_agent_solve_question( "memory_retrieval_react_final_prompt", bot_name=bot_name, time_now=time_now, - question=question, + chat_history=chat_history, collected_info=collected_info if collected_info else "暂无信息", current_iteration=current_iteration, remaining_iterations=remaining_iterations, @@ -802,64 +777,49 @@ async def _react_agent_solve_question( logger.info(f"{react_log_prefix}最终评估Prompt: {evaluation_prompt}") logger.info(f"{react_log_prefix}最终评估响应: {eval_response}") - # 从最终评估响应中提取found_answer或not_enough_info - found_answer_content = None - not_enough_info_reason = None + # 从最终评估响应中提取return_information + return_information_content = None if eval_response: - found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer") - if not found_answer_content: - not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason") + return_information_content = extract_quoted_content(eval_response, "return_information", "information") - # 如果找到答案,返回(找到答案时,无论是否超时,都视为成功完成) - if found_answer_content: + # 如果提取到信息,返回(无论是否超时,都视为成功完成) + if return_information_content is not None: eval_step = { "iteration": current_iteration, "thought": f"[最终评估] {eval_response}", - "actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}], - "observations": ["最终评估阶段检测到found_answer"], + "actions": [{"action_type": "return_information", "action_params": {"information": return_information_content}}], + "observations": ["最终评估阶段检测到return_information"], } thinking_steps.append(eval_step) - logger.info(f"ReAct Agent 最终评估阶段找到关于问题{question}的答案: {found_answer_content}") - - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status=f"找到答案:{found_answer_content}", - ) - - return True, found_answer_content, thinking_steps, False - - # 如果评估为not_enough_info,返回空字符串(不返回任何信息) - if not_enough_info_reason: - eval_step = { - "iteration": current_iteration, - "thought": f"[最终评估] {eval_response}", - "actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}], - "observations": ["最终评估阶段检测到not_enough_info"], - } - thinking_steps.append(eval_step) - logger.info(f"ReAct Agent 最终评估阶段判断信息不足: {not_enough_info_reason}") - - _log_conversation_messages( - conversation_messages, - head_prompt=first_head_prompt, - final_status=f"未找到答案:{not_enough_info_reason}", - ) - - return False, "", thinking_steps, is_timeout + if return_information_content and return_information_content.strip(): + logger.info(f"ReAct Agent 最终评估阶段返回信息: {return_information_content}") + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status=f"返回信息:{return_information_content}", + ) + return True, return_information_content, thinking_steps, False + else: + logger.info("ReAct Agent 最终评估阶段判断信息为空") + _log_conversation_messages( + conversation_messages, + head_prompt=first_head_prompt, + final_status="信息为空:最终评估阶段判断信息为空", + ) + return False, "", thinking_steps, False # 如果没有明确判断,视为not_enough_info,返回空字符串(不返回任何信息) eval_step = { "iteration": current_iteration, "thought": f"[最终评估] {eval_response}", "actions": [ - {"action_type": "not_enough_info", "action_params": {"reason": "已到达最大迭代次数,无法找到答案"}} + {"action_type": "return_information", "action_params": {"information": ""}} ], - "observations": ["已到达最大迭代次数,无法找到答案"], + "observations": ["已到达最大迭代次数,信息为空"], } thinking_steps.append(eval_step) - logger.info("ReAct Agent 已到达最大迭代次数,无法找到答案") + logger.info("ReAct Agent 已到达最大迭代次数,信息为空") _log_conversation_messages( conversation_messages, @@ -1026,66 +986,48 @@ def _store_thinking_back( logger.error(f"存储思考过程失败: {e}") -async def _process_single_question( - question: str, +async def _process_memory_retrieval( chat_id: str, context: str, initial_info: str = "", max_iterations: Optional[int] = None, + chat_history: str = "", ) -> Optional[str]: - """处理单个问题的查询 + """处理记忆检索 Args: - question: 要查询的问题 chat_id: 聊天ID context: 上下文信息 initial_info: 初始信息,将传递给ReAct Agent max_iterations: 最大迭代次数 + chat_history: 聊天记录,将传递给 ReAct Agent Returns: - Optional[str]: 如果找到答案,返回格式化的结果字符串,否则返回None + Optional[str]: 如果找到答案,返回答案内容,否则返回None """ - # 如果question为空或None,直接返回None,不进行查询 - if not question or not question.strip(): - logger.debug("问题为空,跳过查询") - return None - - # logger.info(f"开始处理问题: {question}") - _cleanup_stale_not_found_thinking_back() question_initial_info = initial_info or "" - # 直接使用ReAct Agent查询(不再从thinking_back获取缓存) - # logger.info(f"使用ReAct Agent查询,问题: {question[:50]}...") - + # 直接使用ReAct Agent进行记忆检索 # 如果未指定max_iterations,使用配置的默认值 if max_iterations is None: max_iterations = global_config.memory.max_agent_iterations found_answer, answer, thinking_steps, is_timeout = await _react_agent_solve_question( - question=question, chat_id=chat_id, max_iterations=max_iterations, timeout=global_config.memory.agent_timeout_seconds, initial_info=question_initial_info, + chat_history=chat_history, ) - # 存储查询历史到数据库(超时时不存储) - if not is_timeout: - _store_thinking_back( - chat_id=chat_id, - question=question, - context=context, - found_answer=found_answer, - answer=answer, - thinking_steps=thinking_steps, - ) - else: - logger.info(f"ReAct Agent超时,不存储到数据库,问题: {question[:50]}...") + # 不再存储到数据库,直接返回答案 + if is_timeout: + logger.info("ReAct Agent超时,不返回结果") if found_answer and answer: - return f"问题:{question}\n答案:{answer}" + return answer return None @@ -1099,8 +1041,6 @@ async def build_memory_retrieval_prompt( unknown_words: Optional[List[str]] = None, ) -> str: """构建记忆检索提示 - 使用两段式查询:第一步生成问题,第二步使用ReAct Agent查询答案 - Args: message: 聊天历史记录 sender: 发送者名称 @@ -1133,47 +1073,8 @@ async def build_memory_retrieval_prompt( logger.info(f"{log_prefix}检测是否需要回忆,元消息:{message[:30]}...,消息长度: {len(message)}") try: - time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - bot_name = global_config.bot.nickname chat_id = chat_stream.stream_id - # 获取最近查询历史(最近10分钟内的查询,用于避免重复查询) - recent_query_history = _get_recent_query_history(chat_id, time_window_seconds=600.0) - if not recent_query_history: - recent_query_history = "最近没有查询记录。" - - # 第一步:使用 LLM 生成问题 - question_prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_question_prompt", - bot_name=bot_name, - time_now=time_now, - chat_history=message, - recent_query_history=recent_query_history, - sender=sender, - target_message=target, - ) - - success, response, reasoning_content, model_name = await llm_api.generate_with_model( - question_prompt, - model_config=model_config.model_task_config.tool_use, - request_type="memory.question", - ) - - if global_config.debug.show_memory_prompt: - logger.info(f"{log_prefix}记忆检索问题生成提示词: {question_prompt}") - # logger.info(f"记忆检索问题生成响应: {response}") - - if not success: - logger.error(f"{log_prefix}LLM生成问题失败: {response}") - return "" - - # 解析概念列表和问题列表,只取第一个问题 - single_question: Optional[str] = None - _, questions = parse_questions_json(response) - if questions and len(questions) > 0: - single_question = questions[0].strip() - logger.info(f"{log_prefix}解析到问题: {single_question}") - # 初始阶段:使用 Planner 提供的 unknown_words 进行检索(如果提供) initial_info = "" if unknown_words and len(unknown_words) > 0: @@ -1195,13 +1096,7 @@ async def build_memory_retrieval_prompt( else: logger.debug(f"{log_prefix}unknown_words 检索未找到任何结果") - if not single_question: - logger.debug(f"{log_prefix}模型认为不需要检索记忆或解析失败,不返回任何查询结果") - end_time = time.time() - logger.info(f"{log_prefix}无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}秒") - return "" - - # 第二步:处理问题(使用配置的最大迭代次数和超时时间) + # 直接使用 ReAct Agent 进行记忆检索(跳过问题生成步骤) base_max_iterations = global_config.memory.max_agent_iterations # 根据think_level调整迭代次数:think_level=1时不变,think_level=0时减半 if think_level == 0: @@ -1210,60 +1105,31 @@ async def build_memory_retrieval_prompt( max_iterations = base_max_iterations timeout_seconds = global_config.memory.agent_timeout_seconds logger.debug( - f"{log_prefix}问题: {single_question},think_level={think_level},设置最大迭代次数: {max_iterations}(基础值: {base_max_iterations}),超时时间: {timeout_seconds}秒" + f"{log_prefix}直接使用 ReAct Agent 进行记忆检索,think_level={think_level},设置最大迭代次数: {max_iterations}(基础值: {base_max_iterations}),超时时间: {timeout_seconds}秒" ) - # 处理单个问题 + # 直接调用 ReAct Agent 处理记忆检索 try: - result = await _process_single_question( - question=single_question, + result = await _process_memory_retrieval( chat_id=chat_id, context=message, initial_info=initial_info, max_iterations=max_iterations, + chat_history=message, ) except Exception as e: - logger.error(f"{log_prefix}处理问题 '{single_question}' 时发生异常: {e}") + logger.error(f"{log_prefix}处理记忆检索时发生异常: {e}") result = None - # 获取最近10分钟内已找到答案的缓存记录 - cached_answers = _get_recent_found_answers(chat_id, time_window_seconds=600.0) - - # 合并当前查询结果和缓存答案(去重:如果当前查询的问题在缓存中已存在,优先使用当前结果) - all_results = [] - - # 先添加当前查询的结果 - current_question = None - if result: - all_results.append(result) - # 提取问题(格式为 "问题:xxx\n答案:xxx") - if result.startswith("问题:"): - question_end = result.find("\n答案:") - if question_end != -1: - current_question = result[4:question_end] - - # 添加缓存答案(排除当前查询的问题) - for cached_answer in cached_answers: - if cached_answer.startswith("问题:"): - question_end = cached_answer.find("\n答案:") - if question_end != -1: - cached_question = cached_answer[4:question_end] - if cached_question != current_question: - all_results.append(cached_answer) - end_time = time.time() - if all_results: - retrieved_memory = "\n\n".join(all_results) - current_count = 1 if result else 0 - cached_count = len(all_results) - current_count + if result: logger.info( - f"{log_prefix}记忆检索成功,耗时: {(end_time - start_time):.3f}秒," - f"当前查询 {current_count} 条记忆,缓存 {cached_count} 条记忆,共 {len(all_results)} 条记忆" + f"{log_prefix}记忆检索成功,耗时: {(end_time - start_time):.3f}秒" ) - return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n" + return f"你回忆起了以下信息:\n{result}\n如果与回复内容相关,可以参考这些回忆的信息。\n" else: - logger.debug(f"{log_prefix}问题未找到答案,且无缓存答案") + logger.debug(f"{log_prefix}记忆检索未找到相关信息") return "" except Exception as e: diff --git a/src/memory_system/retrieval_tools/__init__.py b/src/memory_system/retrieval_tools/__init__.py index a617d970..ce584a1b 100644 --- a/src/memory_system/retrieval_tools/__init__.py +++ b/src/memory_system/retrieval_tools/__init__.py @@ -15,16 +15,18 @@ from .query_chat_history import register_tool as register_query_chat_history from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge from .query_person_info import register_tool as register_query_person_info from .query_words import register_tool as register_query_words -from .found_answer import register_tool as register_found_answer +from .return_information import register_tool as register_return_information from src.config.config import global_config def init_all_tools(): """初始化并注册所有记忆检索工具""" - register_query_chat_history() - register_query_person_info() - register_query_words() # 注册query_words工具 - register_found_answer() # 注册found_answer工具 + # 如果开启了lpmm_memory,则不注册query_chat_history工具 + if not global_config.experimental.lpmm_memory: + register_query_chat_history() + register_query_person_info() + register_query_words() + register_return_information() if global_config.lpmm_knowledge.lpmm_mode == "agent": register_lpmm_knowledge() diff --git a/src/memory_system/retrieval_tools/found_answer.py b/src/memory_system/retrieval_tools/found_answer.py deleted file mode 100644 index af233cb9..00000000 --- a/src/memory_system/retrieval_tools/found_answer.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -found_answer工具 - 用于在记忆检索过程中结束查询 -""" - -from src.common.logger import get_logger -from .tool_registry import register_memory_retrieval_tool - -logger = get_logger("memory_retrieval_tools") - - -async def found_answer(answer: str = "") -> str: - """结束查询 - - Args: - answer: 如果找到了答案,提供答案内容;如果未找到答案,可以为空或不提供此参数 - - Returns: - str: 确认信息 - """ - if answer and answer.strip(): - logger.info(f"找到答案: {answer}") - return f"已确认找到答案: {answer}" - else: - logger.info("未找到答案,结束查询") - return "未找到答案,查询结束" - - -def register_tool(): - """注册found_answer工具""" - register_memory_retrieval_tool( - name="found_answer", - description="当你决定结束查询时,调用此工具。如果找到了明确答案,在answer参数中提供答案内容;如果未找到答案,可以不提供answer参数或提供空字符串。只有在检索到明确、具体的答案时才提供answer,不要编造信息。", - parameters=[ - { - "name": "answer", - "type": "string", - "description": "如果找到了答案,提供找到的答案内容,必须基于已收集的信息,不要编造;如果未找到答案,可以不提供此参数或提供空字符串", - "required": False, - }, - ], - execute_func=found_answer, - ) diff --git a/src/memory_system/retrieval_tools/query_lpmm_knowledge.py b/src/memory_system/retrieval_tools/query_lpmm_knowledge.py index c1f39270..eed01af1 100644 --- a/src/memory_system/retrieval_tools/query_lpmm_knowledge.py +++ b/src/memory_system/retrieval_tools/query_lpmm_knowledge.py @@ -56,12 +56,12 @@ def register_tool(): """注册LPMM知识库查询工具""" register_memory_retrieval_tool( name="lpmm_search_knowledge", - description="从LPMM知识库中搜索相关信息,适用于需要知识支持的场景。", + description="从知识库中搜索相关信息,适用于需要知识支持的场景。使用自然语言问句检索", parameters=[ { "name": "query", "type": "string", - "description": "需要查询的关键词或问题", + "description": "需要查询的问题,使用一句疑问句提问,例如:什么是AI?", "required": True, }, { diff --git a/src/memory_system/retrieval_tools/return_information.py b/src/memory_system/retrieval_tools/return_information.py new file mode 100644 index 00000000..bf368083 --- /dev/null +++ b/src/memory_system/retrieval_tools/return_information.py @@ -0,0 +1,42 @@ +""" +return_information工具 - 用于在记忆检索过程中返回总结信息并结束查询 +""" + +from src.common.logger import get_logger +from .tool_registry import register_memory_retrieval_tool + +logger = get_logger("memory_retrieval_tools") + + +async def return_information(information: str) -> str: + """返回总结信息并结束查询 + + Args: + information: 基于已收集信息总结出的相关信息,用于帮助回复。如果收集的信息对当前聊天没有帮助,可以返回空字符串。 + + Returns: + str: 确认信息 + """ + if information and information.strip(): + logger.info(f"返回总结信息: {information}") + return f"已确认返回信息: {information}" + else: + logger.info("未收集到相关信息,结束查询") + return "未收集到相关信息,查询结束" + + +def register_tool(): + """注册return_information工具""" + register_memory_retrieval_tool( + name="return_information", + description="当你决定结束查询时,调用此工具。基于已收集的信息,总结出一段相关信息用于帮助回复。如果收集的信息对当前聊天有帮助,在information参数中提供总结信息;如果信息无关或没有帮助,可以提供空字符串。", + parameters=[ + { + "name": "information", + "type": "string", + "description": "基于已收集信息总结出的相关信息,用于帮助回复。必须基于已收集的信息,不要编造。如果信息对当前聊天没有帮助,可以返回空字符串。", + "required": True, + }, + ], + execute_func=return_information, + ) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index bc26ef71..0de42f50 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.3.5" +version = "7.4.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- # 如果你想要修改配置文件,请递增version的值 @@ -132,7 +132,13 @@ global_memory = false # 是否允许记忆检索进行全局查询 global_memory_blacklist = [ ] # 全局记忆黑名单,当启用全局记忆时,不将特定聊天流纳入检索。格式: ["platform:id:type", ...],例如: ["qq:1919810:private", "qq:114514:group"] -planner_question = true # 是否使用 Planner 提供的 question 作为记忆检索问题。开启后,当 Planner 在 reply 动作中提供了 question 时,直接使用该问题进行记忆检索,跳过 LLM 生成问题的步骤;关闭后沿用旧模式,使用 LLM 生成问题 + +# 聊天历史总结配置 +chat_history_topic_check_message_threshold = 80 # 聊天历史话题检查的消息数量阈值,当累积消息数达到此值时触发话题检查 +chat_history_topic_check_time_hours = 8.0 # 聊天历史话题检查的时间阈值(小时),当距离上次检查超过此时间且消息数达到最小阈值时触发话题检查 +chat_history_topic_check_min_messages = 20 # 聊天历史话题检查的时间触发模式下的最小消息数阈值 +chat_history_finalize_no_update_checks = 3 # 聊天历史话题打包存储的连续无更新检查次数阈值,当话题连续N次检查无新增内容时触发打包存储 +chat_history_finalize_message_count = 5 # 聊天历史话题打包存储的消息条数阈值,当话题的消息条数超过此值时触发打包存储 [dream] interval_minutes = 60 # 做梦时间间隔(分钟),默认30分钟 @@ -306,6 +312,8 @@ private_plan_style = """ # ] chat_prompts = [] +lpmm_memory = false # 是否将聊天历史总结导入到LPMM知识库。开启后,chat_history_summarizer总结出的历史记录会同时导入到知识库 + # 此系统暂时移除,无效配置 [relationship]