From 14a8890791c09457a88b1abba68d80d05fc4b0c4 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 28 Sep 2025 13:48:31 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=8F=90=E4=BE=9B=E8=AE=B0?= =?UTF-8?q?=E5=BF=86=E6=95=B4=E5=90=88=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/memory_system/Memory_chest.py | 180 +++++++++++++++++- .../hippocampus_to_memory_chest_task.py | 172 ++++++++++++----- .../memory_system/memory_management_task.py | 177 +++++++++++++++++ src/config/config.py | 2 + src/config/official_configs.py | 9 + src/main.py | 5 + template/bot_config_template.toml | 6 +- 7 files changed, 498 insertions(+), 53 deletions(-) create mode 100644 src/chat/memory_system/memory_management_task.py diff --git a/src/chat/memory_system/Memory_chest.py b/src/chat/memory_system/Memory_chest.py index 6a449fd3..98cca601 100644 --- a/src/chat/memory_system/Memory_chest.py +++ b/src/chat/memory_system/Memory_chest.py @@ -1,3 +1,5 @@ +import json +import re from src.llm_models.utils_model import LLMRequest from src.config.config import model_config @@ -7,6 +9,7 @@ from src.config.config import global_config from src.plugin_system.apis.message_api import build_readable_messages import time from src.plugin_system.apis.message_api import get_raw_msg_by_timestamp_with_chat +from json_repair import repair_json logger = get_logger("memory_chest") @@ -24,7 +27,7 @@ class MemoryChest: ) self.memory_build_threshold = 30 - self.memory_size_limit = 1024 + self.memory_size_limit = global_config.memory.max_memory_size self.running_content_list = {} # {chat_id: {"content": running_content, "last_update_time": timestamp}} self.fetched_memory_list = [] # [(chat_id, (question, answer, timestamp)), ...] @@ -324,5 +327,180 @@ class MemoryChest: except Exception as e: logger.error(f"保存记忆仓库内容时出错: {e}") + async def choose_merge_target(self, memory_title: str) -> list[str]: + """ + 选择与给定记忆标题相关的记忆目标 + + Args: + memory_title: 要匹配的记忆标题 + + Returns: + list[str]: 选中的记忆内容列表 + """ + try: + all_titles = self.get_all_titles() + content = "" + for title in all_titles: + content += f"{title}\n" + + prompt = f""" +所有记忆列表 +{content} + +请根据以上记忆列表,选择一个与"{memory_title}"相关的记忆,用json输出: +可以选择多个相关的记忆,但最多不超过5个 +例如: +{{ + "selected_title": "选择的相关记忆标题" +}}, +{{ + "selected_title": "选择的相关记忆标题" +}}, +{{ + "selected_title": "选择的相关记忆标题" +}} +... +请输出JSON格式,不要输出其他内容: +""" + if global_config.debug.show_prompt: + logger.info(f"选择合并目标 prompt: {prompt}") + else: + logger.debug(f"选择合并目标 prompt: {prompt}") + + merge_target_response, (reasoning_content, model_name, tool_calls) = await self.LLMRequest_build.generate_response_async(prompt) + + # 解析JSON响应 + selected_titles = self._parse_merge_target_json(merge_target_response) + + # 根据标题查找对应的内容 + selected_contents = self._get_memories_by_titles(selected_titles) + + logger.info(f"选择合并目标结果: {len(selected_contents)} 条记忆") + return selected_contents + + except Exception as e: + logger.error(f"选择合并目标时出错: {e}") + return [] + + def _get_memories_by_titles(self, titles: list[str]) -> list[str]: + """ + 根据标题列表查找对应的记忆内容 + + Args: + titles: 记忆标题列表 + + Returns: + list[str]: 记忆内容列表 + """ + try: + from src.common.database.database_model import MemoryChest as MemoryChestModel + + contents = [] + for title in titles: + if not title or not title.strip(): + continue + + # 在数据库中查找匹配的记忆 + try: + memory_record = MemoryChestModel.select().where(MemoryChestModel.title == title.strip()).first() + if memory_record: + contents.append(memory_record.content) + logger.debug(f"找到记忆: {memory_record.title}") + else: + logger.warning(f"未找到标题为 '{title}' 的记忆") + except Exception as e: + logger.error(f"查找标题 '{title}' 的记忆时出错: {e}") + continue + + logger.info(f"成功找到 {len(contents)} 条记忆内容") + return contents + + except Exception as e: + logger.error(f"根据标题查找记忆时出错: {e}") + return [] + + def _parse_merge_target_json(self, json_text: str) -> list[str]: + """ + 解析choose_merge_target生成的JSON响应 + + Args: + json_text: LLM返回的JSON文本 + + Returns: + list[str]: 解析出的记忆标题列表 + """ + try: + # 清理JSON文本,移除可能的额外内容 + repaired_content = repair_json(json_text) + + # 尝试直接解析JSON + try: + parsed_data = json.loads(repaired_content) + if isinstance(parsed_data, list): + # 如果是列表,提取selected_title字段 + titles = [] + for item in parsed_data: + if isinstance(item, dict) and "selected_title" in item: + titles.append(item["selected_title"]) + return titles + elif isinstance(parsed_data, dict) and "selected_title" in parsed_data: + # 如果是单个对象 + return [parsed_data["selected_title"]] + except json.JSONDecodeError: + pass + + # 如果直接解析失败,尝试提取JSON对象 + # 查找所有包含selected_title的JSON对象 + pattern = r'\{[^}]*"selected_title"[^}]*\}' + matches = re.findall(pattern, repaired_content) + + titles = [] + for match in matches: + try: + obj = json.loads(match) + if "selected_title" in obj: + titles.append(obj["selected_title"]) + except json.JSONDecodeError: + continue + + if titles: + return titles + + logger.warning(f"无法解析JSON响应: {json_text[:200]}...") + return [] + + except Exception as e: + logger.error(f"解析合并目标JSON时出错: {e}") + return [] + + async def merge_memory(self,memory_list: list[str]) -> tuple[str, str]: + """ + 合并记忆 + """ + try: + content = "" + for memory in memory_list: + content += f"{memory.content}\n" + + prompt = f""" +以下是多段记忆内容,请将它们合并成一段记忆: +{content} + +请主要关注概念和知识,而不是聊天的琐事 +记忆为一段纯文本,逻辑清晰,指出事件,概念的含义,并说明关系 +请输出添加后的记忆内容,不要输出其他内容: +""" + + if global_config.debug.show_prompt: + logger.info(f"合并记忆 prompt: {prompt}") + else: + logger.debug(f"合并记忆 prompt: {prompt}") + + merged_memory, (reasoning_content, model_name, tool_calls) = await self.LLMRequest_build.generate_response_async(prompt) + + return merged_memory + except Exception as e: + logger.error(f"合并记忆时出错: {e}") + global_memory_chest = MemoryChest() \ No newline at end of file diff --git a/src/chat/memory_system/hippocampus_to_memory_chest_task.py b/src/chat/memory_system/hippocampus_to_memory_chest_task.py index e2764141..b30e6d4d 100644 --- a/src/chat/memory_system/hippocampus_to_memory_chest_task.py +++ b/src/chat/memory_system/hippocampus_to_memory_chest_task.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- +import asyncio import random +import re from typing import List from src.manager.async_task_manager import AsyncTask from src.chat.memory_system.Hippocampus import hippocampus_manager -from src.chat.memory_system.Memory_chest import global_memory_chest from src.common.logger import get_logger logger = get_logger("hippocampus_to_memory_chest") @@ -13,20 +14,42 @@ logger = get_logger("hippocampus_to_memory_chest") class HippocampusToMemoryChestTask(AsyncTask): """海马体到记忆仓库的转换任务 - 每60秒随机选择5个海马体节点,将内容拼接为content, - 然后根据memory_chest的格式生成标题并存储 + 每10秒执行一次转换,每次最多处理50批,每批15个节点, + 当没有新节点时停止任务运行 """ def __init__(self): super().__init__( task_name="Hippocampus to Memory Chest Task", - wait_before_start=10, # 启动后等待60秒再开始 - run_interval=60 # 每60秒运行一次 + wait_before_start=5, # 启动后等待5秒再开始 + run_interval=10 # 每10秒运行一次 ) + self.task_stopped = False # 标记任务是否已停止 + + async def start_task(self, abort_flag: asyncio.Event): + """重写start_task方法,支持任务停止""" + if self.wait_before_start > 0: + # 等待指定时间后开始任务 + await asyncio.sleep(self.wait_before_start) + + while not abort_flag.is_set() and not self.task_stopped: + await self.run() + if self.run_interval > 0: + await asyncio.sleep(self.run_interval) + else: + break + + if self.task_stopped: + logger.info("[海马体转换] 任务已完全停止,不再执行") async def run(self): """执行转换任务""" try: + # 检查任务是否已停止 + if self.task_stopped: + logger.info("[海马体转换] 任务已停止,跳过执行") + return + logger.info("[海马体转换] 开始执行海马体到记忆仓库的转换任务") # 检查海马体管理器是否已初始化 @@ -38,81 +61,100 @@ class HippocampusToMemoryChestTask(AsyncTask): hippocampus = hippocampus_manager.get_hippocampus() memory_graph = hippocampus.memory_graph.G - # 获取所有节点 - all_nodes = list(memory_graph.nodes()) + # 执行10批转换 + total_processed = 0 + total_success = 0 - if len(all_nodes) < 10: - selected_nodes = all_nodes - logger.info(f"[海马体转换] 当前只有 {len(all_nodes)} 个节点,少于5个,跳过本次转换") - else: + for batch_num in range(1, 51): # 执行10批 + logger.info(f"[海马体转换] 开始执行第 {batch_num} 批转换") - # 随机选择5个节点 - selected_nodes = random.sample(all_nodes, 10) - logger.info(f"[海马体转换] 随机选择了 {len(selected_nodes)} 个节点: {selected_nodes}") - - # 拼接节点内容 - content_parts = [] - for node in selected_nodes: - node_data = memory_graph.nodes[node] - memory_items = node_data.get("memory_items", "") + # 检查剩余节点 + remaining_nodes = list(memory_graph.nodes()) + if len(remaining_nodes) == 0: + logger.info(f"[海马体转换] 第 {batch_num} 批:没有剩余节点,停止任务运行") + self.task_stopped = True + break - if memory_items and memory_items.strip(): - # 添加节点名称和内容 - content_parts.append(f"【{node}】{memory_items}") + # 如果剩余节点不足10个,使用所有剩余节点 + if len(remaining_nodes) < 15: + selected_nodes = remaining_nodes + logger.info(f"[海马体转换] 第 {batch_num} 批:剩余节点不足10个({len(remaining_nodes)}个),使用所有剩余节点") else: - logger.debug(f"[海马体转换] 节点 {node} 没有记忆内容,跳过") - - if not content_parts: - logger.info("[海马体转换] 没有找到有效的记忆内容,跳过本次转换") - return + # 随机选择10个节点 + selected_nodes = random.sample(remaining_nodes, 15) + logger.info(f"[海马体转换] 第 {batch_num} 批:选择了 {len(selected_nodes)} 个节点") - # 拼接所有内容 - combined_content = "\n\n".join(content_parts) - logger.info(f"[海马体转换] 拼接完成,内容长度: {len(combined_content)} 字符") + # 拼接节点内容 + content_parts = [] + valid_nodes = [] + + for node in selected_nodes: + node_data = memory_graph.nodes[node] + memory_items = node_data.get("memory_items", "") + + if memory_items and memory_items.strip(): + # 添加节点名称和内容 + content_parts.append(f"【{node}】{memory_items}") + valid_nodes.append(node) + else: + logger.debug(f"[海马体转换] 第 {batch_num} 批:节点 {node} 没有记忆内容,跳过") + + if not content_parts: + logger.info(f"[海马体转换] 第 {batch_num} 批:没有找到有效的记忆内容,跳过") + continue + + # 拼接所有内容 + combined_content = "\n\n".join(content_parts) + logger.info(f"[海马体转换] 第 {batch_num} 批:拼接完成,内容长度: {len(combined_content)} 字符") + + # 生成标题并存储到记忆仓库 + success = await self._save_to_memory_chest(combined_content, batch_num) + + # 如果保存成功,删除已转换的节点 + if success: + await self._remove_converted_nodes(valid_nodes) + total_success += 1 + logger.info(f"[海马体转换] 第 {batch_num} 批:转换成功") + else: + logger.warning(f"[海马体转换] 第 {batch_num} 批:转换失败") + + total_processed += 1 + + # 批次间短暂休息,避免过于频繁的数据库操作 + if batch_num < 10: + await asyncio.sleep(0.1) - # 生成标题并存储到记忆仓库 - success = await self._save_to_memory_chest(combined_content) - - # 如果保存成功,删除已转换的节点 - if success: - await self._remove_converted_nodes(selected_nodes) + logger.info(f"[海马体转换] 本次执行完成:共处理 {total_processed} 批,成功 {total_success} 批") logger.info("[海马体转换] 转换任务完成") except Exception as e: logger.error(f"[海马体转换] 执行转换任务时发生错误: {e}", exc_info=True) - async def _save_to_memory_chest(self, content: str) -> bool: + async def _save_to_memory_chest(self, content: str, batch_num: int = 1) -> bool: """将内容保存到记忆仓库 Args: content: 要保存的内容 + batch_num: 批次号 Returns: bool: 保存是否成功 """ try: - # 使用Memory_chest的LLMRequest生成标题 - title_prompt = f""" -请为以下内容生成一个描述全面的标题,要求描述内容的主要概念和事件: -{content} - -请只输出标题,不要输出其他内容: -""" + # 从内容中提取节点名称作为标题 + title = self._generate_title_from_content(content, batch_num) - # 使用Memory_chest的LLM模型生成标题 - title, (reasoning_content, model_name, tool_calls) = await global_memory_chest.LLMRequest_build.generate_response_async(title_prompt) - - if title and title.strip(): + if title: # 保存到数据库 from src.common.database.database_model import MemoryChest as MemoryChestModel MemoryChestModel.create( - title=title.strip(), + title=title, content=content ) - logger.info(f"[海马体转换] 已保存到记忆仓库,标题: {title.strip()}") + logger.info(f"[海马体转换] 第 {batch_num} 批:已保存到记忆仓库,标题: {title}") return True else: logger.warning("[海马体转换] 生成标题失败,跳过保存") @@ -122,6 +164,34 @@ class HippocampusToMemoryChestTask(AsyncTask): logger.error(f"[海马体转换] 保存到记忆仓库时发生错误: {e}", exc_info=True) return False + def _generate_title_from_content(self, content: str, batch_num: int = 1) -> str: + """从内容中提取节点名称生成标题 + + Args: + content: 拼接的内容 + batch_num: 批次号 + + Returns: + str: 生成的标题 + """ + try: + # 提取所有【节点名称】中的节点名称 + node_pattern = r'【([^】]+)】' + nodes = re.findall(node_pattern, content) + + if nodes: + # 去重并限制数量(最多显示前5个) + unique_nodes = list(dict.fromkeys(nodes))[:5] + title = f"关于{','.join(unique_nodes)}的记忆" + return title + else: + logger.warning("[海马体转换] 无法从内容中提取节点名称") + return "" + + except Exception as e: + logger.error(f"[海马体转换] 生成标题时发生错误: {e}", exc_info=True) + return "" + async def _remove_converted_nodes(self, nodes_to_remove: List[str]): """删除已转换的海马体节点 diff --git a/src/chat/memory_system/memory_management_task.py b/src/chat/memory_system/memory_management_task.py new file mode 100644 index 00000000..df534f27 --- /dev/null +++ b/src/chat/memory_system/memory_management_task.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +import asyncio +import random +from typing import List + +from src.manager.async_task_manager import AsyncTask +from src.chat.memory_system.Memory_chest import global_memory_chest +from src.common.logger import get_logger +from src.common.database.database_model import MemoryChest as MemoryChestModel +from src.config.config import global_config + +logger = get_logger("memory_management") + + +class MemoryManagementTask(AsyncTask): + """记忆管理定时任务 + + 根据Memory_chest中的记忆数量与MAX_MEMORY_NUMBER的比例来决定执行频率: + - 小于50%:每600秒执行一次 + - 大于等于50%:每300秒执行一次 + + 每次执行时随机选择一个title,执行choose_merge_target和merge_memory, + 然后删除原始记忆 + """ + + def __init__(self): + super().__init__( + task_name="Memory Management Task", + wait_before_start=10, # 启动后等待10秒再开始 + run_interval=300 # 默认300秒间隔,会根据记忆数量动态调整 + ) + self.max_memory_number = global_config.memory.max_memory_number + + async def start_task(self, abort_flag: asyncio.Event): + """重写start_task方法,支持动态调整执行间隔""" + if self.wait_before_start > 0: + # 等待指定时间后开始任务 + await asyncio.sleep(self.wait_before_start) + + while not abort_flag.is_set(): + await self.run() + + # 动态调整执行间隔 + current_interval = self._calculate_interval() + logger.info(f"[记忆管理] 下次执行间隔: {current_interval}秒") + + if current_interval > 0: + await asyncio.sleep(current_interval) + else: + break + + def _calculate_interval(self) -> int: + """根据当前记忆数量计算执行间隔""" + try: + current_count = self._get_memory_count() + percentage = current_count / self.max_memory_number + + if percentage < 0.5: + # 小于50%,每600秒执行一次 + return 3600 + elif percentage < 0.7: + # 大于等于50%,每300秒执行一次 + return 600 + else: + # 大于等于70%,每120秒执行一次 + return 120 + + except Exception as e: + logger.error(f"[记忆管理] 计算执行间隔时出错: {e}") + return 300 # 默认300秒 + + def _get_memory_count(self) -> int: + """获取当前记忆数量""" + try: + count = MemoryChestModel.select().count() + logger.debug(f"[记忆管理] 当前记忆数量: {count}") + return count + except Exception as e: + logger.error(f"[记忆管理] 获取记忆数量时出错: {e}") + return 0 + + async def run(self): + """执行记忆管理任务""" + try: + logger.info("[记忆管理] 开始执行记忆管理任务") + + # 获取当前记忆数量 + current_count = self._get_memory_count() + percentage = current_count / self.max_memory_number + logger.info(f"[记忆管理] 当前记忆数量: {current_count}/{self.max_memory_number} ({percentage:.1%})") + + # 如果记忆数量为0,跳过执行 + if current_count < 10: + logger.info("[记忆管理] 没有太多记忆,跳过执行") + return + + # 随机选择一个记忆标题 + selected_title = self._get_random_memory_title() + if not selected_title: + logger.warning("[记忆管理] 无法获取随机记忆标题,跳过执行") + return + + logger.info(f"[记忆管理] 随机选择的记忆标题: {selected_title}") + + # 执行choose_merge_target获取相关记忆内容 + related_contents = await global_memory_chest.choose_merge_target(selected_title) + if not related_contents: + logger.warning("[记忆管理] 未找到相关记忆内容,跳过合并") + return + + logger.info(f"[记忆管理] 找到 {len(related_contents)} 条相关记忆") + + # 执行merge_memory合并记忆 + merged_title, merged_content = await global_memory_chest.merge_memory(related_contents) + if not merged_title or not merged_content: + logger.warning("[记忆管理] 记忆合并失败,跳过删除") + return + + logger.info(f"[记忆管理] 记忆合并成功,新标题: {merged_title}") + + # 删除原始记忆(包括选中的标题和相关的记忆) + deleted_count = self._delete_original_memories(selected_title, related_contents) + logger.info(f"[记忆管理] 已删除 {deleted_count} 条原始记忆") + + logger.info("[记忆管理] 记忆管理任务完成") + + except Exception as e: + logger.error(f"[记忆管理] 执行记忆管理任务时发生错误: {e}", exc_info=True) + + def _get_random_memory_title(self) -> str: + """随机获取一个记忆标题""" + try: + # 获取所有记忆标题 + all_titles = global_memory_chest.get_all_titles() + if not all_titles: + return "" + + # 随机选择一个标题 + selected_title = random.choice(all_titles) + return selected_title + + except Exception as e: + logger.error(f"[记忆管理] 获取随机记忆标题时发生错误: {e}") + return "" + + def _delete_original_memories(self, selected_title: str, related_contents: List[str]) -> int: + """删除原始记忆""" + try: + deleted_count = 0 + + # 删除选中的标题对应的记忆 + try: + deleted = MemoryChestModel.delete().where(MemoryChestModel.title == selected_title).execute() + if deleted > 0: + deleted_count += deleted + logger.debug(f"[记忆管理] 删除选中记忆: {selected_title}") + except Exception as e: + logger.error(f"[记忆管理] 删除选中记忆时出错: {e}") + + # 删除相关记忆(通过内容匹配) + for content in related_contents: + try: + # 通过内容查找并删除对应的记忆 + memories_to_delete = MemoryChestModel.select().where(MemoryChestModel.content == content) + for memory in memories_to_delete: + MemoryChestModel.delete().where(MemoryChestModel.id == memory.id).execute() + deleted_count += 1 + logger.debug(f"[记忆管理] 删除相关记忆: {memory.title}") + except Exception as e: + logger.error(f"[记忆管理] 删除相关记忆时出错: {e}") + continue + + return deleted_count + + except Exception as e: + logger.error(f"[记忆管理] 删除原始记忆时发生错误: {e}") + return 0 diff --git a/src/config/config.py b/src/config/config.py index 465b9158..a2759b57 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -30,6 +30,7 @@ from src.config.official_configs import ( RelationshipConfig, ToolConfig, VoiceConfig, + MemoryConfig, DebugConfig, ) @@ -353,6 +354,7 @@ class Config(ConfigBase): maim_message: MaimMessageConfig lpmm_knowledge: LPMMKnowledgeConfig tool: ToolConfig + memory: MemoryConfig debug: DebugConfig voice: VoiceConfig diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 87bb4d97..d8b2cccb 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -101,6 +101,15 @@ class MessageReceiveConfig(ConfigBase): ban_msgs_regex: set[str] = field(default_factory=lambda: set()) """过滤正则表达式列表""" +@dataclass +class MemoryConfig(ConfigBase): + """记忆配置类""" + + max_memory_number: int = 100 + """记忆最大数量""" + + max_memory_size: int = 2048 + """记忆最大大小""" @dataclass class ExpressionConfig(ConfigBase): diff --git a/src/main.py b/src/main.py index 496242bf..c91f23fc 100644 --- a/src/main.py +++ b/src/main.py @@ -15,6 +15,7 @@ from src.mood.mood_manager import mood_manager from src.chat.knowledge import lpmm_start_up from src.chat.memory_system.Hippocampus import hippocampus_manager from src.chat.memory_system.hippocampus_to_memory_chest_task import HippocampusToMemoryChestTask +from src.chat.memory_system.memory_management_task import MemoryManagementTask from rich.traceback import install from src.migrate_helper.migrate import check_and_run_migrations # from src.api.main import start_api_server @@ -101,6 +102,10 @@ class MainSystem: # 添加海马体到记忆仓库的转换任务 await async_task_manager.add_task(HippocampusToMemoryChestTask()) logger.info("海马体到记忆仓库转换任务已启动") + + # 添加记忆管理任务 + await async_task_manager.add_task(MemoryManagementTask()) + logger.info("记忆管理任务已启动") # await asyncio.sleep(0.5) #防止logger输出飞了 diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 8c8cf09f..bc4ddd17 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "6.15.1" +version = "6.16.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -80,6 +80,10 @@ mentioned_bot_reply = true # 是否启用提及必回复 max_context_size = 30 # 上下文长度 planner_smooth = 5 #规划器平滑,增大数值会减小planner负荷,略微降低反应速度,推荐2-8,0为关闭,必须大于等于0 +[memory] +max_memory_number = 100 # 记忆最大数量 +max_memory_size = 2048 # 记忆最大大小 + [relationship] enable_relationship = true # 是否启用关系系统