diff --git a/src/memory_system/Memory_chest.py b/src/memory_system/Memory_chest.py index 9404cf21..dac3995b 100644 --- a/src/memory_system/Memory_chest.py +++ b/src/memory_system/Memory_chest.py @@ -14,11 +14,13 @@ from src.plugin_system.apis.message_api import get_raw_msg_by_timestamp_with_cha from json_repair import repair_json from src.memory_system.questions import global_conflict_tracker + from .memory_utils import ( find_best_matching_memory, check_title_exists_fuzzy, get_all_titles, find_most_similar_memory_by_chat_id, + compute_merge_similarity_threshold ) @@ -82,218 +84,6 @@ class MemoryChest: except Exception as e: logger.error(f"[记忆管理] 按年龄权重删除记忆时出错: {e}") return False - - def _compute_merge_similarity_threshold(self) -> float: - """ - 根据当前记忆数量占比动态计算合并相似度阈值。 - - 规则:占比越高,阈值越低。 - - < 60%: 0.80(更严格,避免早期误合并) - - < 80%: 0.70 - - < 100%: 0.60 - - < 120%: 0.50 - - >= 120%: 0.45(最宽松,加速收敛) - """ - try: - current_count = MemoryChestModel.select().count() - max_count = max(1, int(global_config.memory.max_memory_number)) - percentage = current_count / max_count - - if percentage < 0.6: - return 0.70 - elif percentage < 0.8: - return 0.60 - elif percentage < 1.0: - return 0.50 - elif percentage < 1.5: - return 0.40 - elif percentage < 2: - return 0.30 - else: - return 0.25 - except Exception: - # 发生异常时使用保守阈值 - return 0.70 - - async def build_running_content(self, chat_id: str = None) -> str: - """ - 构建记忆仓库的运行内容 - - Args: - message_str: 消息内容 - chat_id: 聊天ID,用于提取对应的运行内容 - - Returns: - str: 构建后的运行内容 - """ - # 检查是否需要更新:基于消息数量和最新消息时间差的智能更新机制 - # - # 更新机制说明: - # 1. 消息数量 > 100:直接触发更新(高频消息场景) - # 2. 消息数量 > 70 且最新消息时间差 > 30秒:触发更新(中高频消息场景) - # 3. 消息数量 > 50 且最新消息时间差 > 60秒:触发更新(中频消息场景) - # 4. 消息数量 > 30 且最新消息时间差 > 300秒:触发更新(低频消息场景) - # - # 设计理念: - # - 消息越密集,时间阈值越短,确保及时更新记忆 - # - 消息越稀疏,时间阈值越长,避免频繁无意义的更新 - # - 通过最新消息时间差判断消息活跃度,而非简单的总时间差 - # - 平衡更新频率与性能,在保证记忆及时性的同时减少计算开销 - if chat_id not in self.running_content_list: - self.running_content_list[chat_id] = { - "content": "", - "last_update_time": time.time(), - "create_time": time.time() - } - - should_update = True - if chat_id and chat_id in self.running_content_list: - last_update_time = self.running_content_list[chat_id]["last_update_time"] - current_time = time.time() - # 使用message_api获取消息数量 - message_list = get_raw_msg_by_timestamp_with_chat( - timestamp_start=last_update_time, - timestamp_end=current_time, - chat_id=chat_id, - limit=global_config.chat.max_context_size * 2, - ) - - new_messages_count = len(message_list) - - # 获取最新消息的时间戳 - latest_message_time = last_update_time - if message_list: - # 假设消息列表按时间排序,取最后一条消息的时间戳 - latest_message = message_list[-1] - if hasattr(latest_message, 'timestamp'): - latest_message_time = latest_message.timestamp - elif isinstance(latest_message, dict) and 'timestamp' in latest_message: - latest_message_time = latest_message['timestamp'] - - # 计算最新消息时间与现在时间的差(秒) - latest_message_time_diff = current_time - latest_message_time - - # 智能更新条件判断 - 按优先级从高到低检查 - should_update = False - update_reason = "" - - if global_config.memory.memory_build_frequency > 0: - if new_messages_count > 100/global_config.memory.memory_build_frequency: - # 条件1:消息数量 > 100,直接触发更新 - # 适用场景:群聊刷屏、高频讨论等消息密集场景 - # 无需时间限制,确保重要信息不被遗漏 - should_update = True - update_reason = f"消息数量 {new_messages_count} > 100,直接触发更新" - elif new_messages_count > 70/global_config.memory.memory_build_frequency and latest_message_time_diff > 30: - # 条件2:消息数量 > 70 且最新消息时间差 > 30秒 - # 适用场景:中高频讨论,但需要确保消息流已稳定 - # 30秒的时间差确保不是正在进行的实时对话 - should_update = True - update_reason = f"消息数量 {new_messages_count} > 70 且最新消息时间差 {latest_message_time_diff:.1f}s > 30s" - elif new_messages_count > 50/global_config.memory.memory_build_frequency and latest_message_time_diff > 60: - # 条件3:消息数量 > 50 且最新消息时间差 > 60秒 - # 适用场景:中等频率讨论,等待1分钟确保对话告一段落 - # 平衡及时性与稳定性 - should_update = True - update_reason = f"消息数量 {new_messages_count} > 50 且最新消息时间差 {latest_message_time_diff:.1f}s > 60s" - elif new_messages_count > 30/global_config.memory.memory_build_frequency and latest_message_time_diff > 300: - # 条件4:消息数量 > 30 且最新消息时间差 > 300秒(5分钟) - # 适用场景:低频但有一定信息量的讨论 - # 5分钟的时间差确保对话完全结束,避免频繁更新 - should_update = True - update_reason = f"消息数量 {new_messages_count} > 30 且最新消息时间差 {latest_message_time_diff:.1f}s > 300s" - - logger.debug(f"chat_id {chat_id} 更新检查: {update_reason if should_update else f'消息数量 {new_messages_count},最新消息时间差 {latest_message_time_diff:.1f}s,不满足更新条件'}") - - - if should_update: - # 如果有chat_id,先提取对应的running_content - message_str = build_readable_messages( - message_list, - replace_bot_name=True, - timestamp_mode="relative", - read_mark=0.0, - show_actions=False, - remove_emoji_stickers=True, - ) - - # 随机从格式示例列表中选取若干行用于提示 - format_candidates = [ - "[概念] 是 [概念的含义(简短描述,不超过十个字)]", - "[概念] 不是 [对概念的负面含义(简短描述,不超过十个字)]", - "[概念1] 与 [概念2] 是 [概念1和概念2的关联(简短描述,不超过二十个字)]", - "[概念1] 包含 [概念2] 和 [概念3]", - "[概念1] 属于 [概念2]", - "[概念1] 的例子是 [例子1] 和 [例子2]", - "[概念] 的特征是 [特征1]、[特征2]", - "[概念1] 导致 [概念2]", - "[概念1] 需要 [条件1] 和 [条件2]", - "[概念1] 的用途是 [用途1] 和 [用途2]", - "[概念1] 与 [概念2] 的区别是 [区别点]", - "[概念] 的别名是 [别名]", - "[概念1] 包括但不限于 [概念2]、[概念3]", - "[概念] 的反义是 [反义概念]", - "[概念] 的组成有 [部分1]、[部分2]", - "[概念] 出现于 [时间或场景]", - "[概念] 的方法有 [方法1]、[方法2]", - ] - - selected_count = random.randint(3, 6) - selected_lines = random.sample(format_candidates, selected_count) - format_section = "\n".join(selected_lines) + "\n......(不要包含中括号)" - - prompt = f""" -以下是一段你参与的聊天记录,请你在其中总结出记忆: - -<聊天记录> -{message_str} - -聊天记录中可能包含有效信息,也可能信息密度很低,请你根据聊天记录中的信息,总结出记忆内容 --------------------------------- -对[图片]的处理: -1.除非与文本有关,不要将[图片]的内容整合到记忆中 -2.如果图片与某个概念相关,将图片中的关键内容也整合到记忆中,不要写入图片原文,例如: - -聊天记录(与图片有关): -用户说:[图片1:这是一个黄色的龙形状玩偶,被一只手拿着。] -用户说:这个玩偶看起来很可爱,是我新买的奶龙 -总结的记忆内容: -黄色的龙形状玩偶 是 奶龙 - -聊天记录(概念与图片无关): -用户说:[图片1:这是一个台电脑,屏幕上显示了某种游戏。] -用户说:使命召唤今天发售了新一代,有没有人玩 -总结的记忆内容: -使命召唤新一代 是 最新发售的游戏 - -请主要关注概念和知识或者时效性较强的信息!!,而不是聊天的琐事 -1.不要关注诸如某个用户做了什么,说了什么,不要关注某个用户的行为,而是关注其中的概念性信息 -2.概念要求精确,不啰嗦,像科普读物或教育课本那样 -3.记忆为一段纯文本,逻辑清晰,指出概念的含义,并说明关系 - - 记忆内容的格式,你必须仿照下面的格式,但不一定全部使用: -{format_section} - -请仿照上述格式输出,每个知识点一句话。输出成一段平文本 -现在请你输出,不要输出其他内容,注意一定要直白,白话,口语化不要浮夸,修辞。: -""" - - if global_config.debug.show_prompt: - logger.info(f"记忆仓库构建运行内容 prompt: {prompt}") - else: - logger.debug(f"记忆仓库构建运行内容 prompt: {prompt}") - - running_content, (reasoning_content, model_name, tool_calls) = await self.LLMRequest_build.generate_response_async(prompt) - - print(f"prompt: {prompt}\n记忆仓库构建运行内容: {running_content}") - - # 直接保存:每次构建后立即入库,并刷新时间戳窗口 - if chat_id and running_content: - await self._save_to_database_and_clear(chat_id, running_content) - - - return running_content - async def get_answer_by_question(self, chat_id: str = "", question: str = "") -> str: """ @@ -521,7 +311,7 @@ class MemoryChest: return [], [] # 动态计算相似度阈值(占比越高阈值越低) - dynamic_threshold = self._compute_merge_similarity_threshold() + dynamic_threshold = compute_merge_similarity_threshold() # 使用相似度匹配查找最相似的记忆(基于动态阈值) similar_memory = find_most_similar_memory_by_chat_id( diff --git a/src/memory_system/curious.py b/src/memory_system/curious.py index 80bffdae..c10d6b84 100644 --- a/src/memory_system/curious.py +++ b/src/memory_system/curious.py @@ -62,11 +62,7 @@ class CuriousDetector: show_actions=True, ) - # 检查是否已经有问题在跟踪中 - existing_questions = global_conflict_tracker.get_questions_by_chat_id(self.chat_id) - if len(existing_questions) > 0: - logger.debug(f"当前已有{len(existing_questions)}个问题在跟踪中,跳过检测") - return None + # 问题跟踪功能已移除,不再检查已有问题 # 构建检测提示词 prompt = f"""你是一个严谨的聊天内容分析器。请分析以下聊天记录,检测是否存在需要提问的内容。 @@ -154,11 +150,10 @@ class CuriousDetector: if not question or not question.strip(): return False - # 记录问题到冲突追踪器,并开始跟踪 - await global_conflict_tracker.track_conflict( - question=question.strip(), + # 记录问题到冲突追踪器 + await global_conflict_tracker.record_conflict( + conflict_content=question.strip(), context=context, - start_following=False, chat_id=self.chat_id ) diff --git a/src/memory_system/memory_management_task.py b/src/memory_system/memory_management_task.py deleted file mode 100644 index edfd875c..00000000 --- a/src/memory_system/memory_management_task.py +++ /dev/null @@ -1,241 +0,0 @@ -# -*- coding: utf-8 -*- -import asyncio -import random -import time -from typing import List - -from src.manager.async_task_manager import AsyncTask -from src.memory_system.Memory_chest import global_memory_chest -from src.common.logger import get_logger -from src.common.database.database_model import MemoryChest as MemoryChestModel, MemoryConflict -from src.config.config import global_config - -logger = get_logger("memory") - - -class MemoryManagementTask(AsyncTask): - """记忆管理定时任务 - - 根据Memory_chest中的记忆数量与MAX_MEMORY_NUMBER的比例来决定执行频率: - - 小于50%:每600秒执行一次 - - 大于等于50%:每300秒执行一次 - - 每次执行时随机选择一个title,执行choose_merge_target和merge_memory, - 然后删除原始记忆 - """ - - def __init__(self): - super().__init__( - task_name="Memory Management Task", - wait_before_start=10, # 启动后等待10秒再开始 - run_interval=300 # 默认300秒间隔,会根据记忆数量动态调整 - ) - self.max_memory_number = global_config.memory.max_memory_number - - async def start_task(self, abort_flag: asyncio.Event): - """重写start_task方法,支持动态调整执行间隔""" - if self.wait_before_start > 0: - # 等待指定时间后开始任务 - await asyncio.sleep(self.wait_before_start) - - while not abort_flag.is_set(): - await self.run() - - # 动态调整执行间隔 - current_interval = self._calculate_interval() - logger.info(f"[记忆管理] 下次执行间隔: {current_interval}秒") - - if current_interval > 0: - await asyncio.sleep(current_interval) - else: - break - - def _calculate_interval(self) -> int: - """根据当前记忆数量计算执行间隔""" - try: - current_count = self._get_memory_count() - percentage = current_count / self.max_memory_number - - if percentage < 0.6: - # 小于50%,每600秒执行一次 - return 3600 - elif percentage < 1: - # 大于等于50%,每300秒执行一次 - return 1800 - elif percentage < 1.5: - # 大于等于100%,每120秒执行一次 - return 600 - elif percentage < 1.8: - return 120 - else: - return 30 - - except Exception as e: - logger.error(f"[记忆管理] 计算执行间隔时出错: {e}") - return 300 # 默认300秒 - - def _get_memory_count(self) -> int: - """获取当前记忆数量""" - try: - count = MemoryChestModel.select().count() - logger.debug(f"[记忆管理] 当前记忆数量: {count}") - return count - except Exception as e: - logger.error(f"[记忆管理] 获取记忆数量时出错: {e}") - return 0 - - async def run(self): - """执行记忆管理任务""" - try: - - # 获取当前记忆数量 - current_count = self._get_memory_count() - percentage = current_count / self.max_memory_number - logger.info(f"当前记忆数量: {current_count}/{self.max_memory_number} ({percentage:.1%})") - - # 当占比 > 1.6 时,持续删除直到占比 <= 1.6(越老/越新更易被删) - if percentage > 2: - logger.info("记忆过多,开始遗忘记忆") - while True: - if percentage <= 1.8: - break - removed = global_memory_chest.remove_one_memory_by_age_weight() - if not removed: - logger.warning("没有可删除的记忆,停止连续删除") - break - # 重新计算占比 - current_count = self._get_memory_count() - percentage = current_count / self.max_memory_number - logger.info(f"遗忘进度: 当前 {current_count}/{self.max_memory_number} ({percentage:.1%})") - logger.info("遗忘记忆结束") - - # 如果记忆数量为0,跳过执行 - if current_count < 10: - return - - # 随机选择一个记忆标题和chat_id - selected_title, selected_chat_id = self._get_random_memory_title() - if not selected_title: - logger.warning("无法获取随机记忆标题,跳过执行") - return - - # 执行choose_merge_target获取相关记忆(标题与内容) - related_titles, related_contents = await global_memory_chest.choose_merge_target(selected_title, selected_chat_id) - if not related_titles or not related_contents: - logger.info("无合适合并内容,跳过本次合并") - return - - logger.info(f"{selected_chat_id} 为 [{selected_title}] 找到 {len(related_contents)} 条相关记忆:{related_titles}") - - # 执行merge_memory合并记忆 - merged_title, merged_content = await global_memory_chest.merge_memory(related_contents,selected_chat_id) - if not merged_title or not merged_content: - logger.warning("[记忆管理] 记忆合并失败,跳过删除") - return - - logger.info(f"记忆合并成功,新标题: {merged_title}") - - # 删除原始记忆(包括选中的标题和相关的记忆标题) - titles_to_delete = [selected_title] + related_titles - deleted_count = self._delete_original_memories(titles_to_delete) - logger.info(f"已删除 {deleted_count} 条原始记忆") - - except Exception as e: - logger.error(f"[记忆管理] 执行记忆管理任务时发生错误: {e}", exc_info=True) - - def _get_random_memory_title(self) -> tuple[str, str]: - """随机获取一个记忆标题和对应的chat_id""" - try: - # 获取所有记忆记录 - all_memories = MemoryChestModel.select() - if not all_memories: - return "", "" - - # 随机选择一个记忆 - selected_memory = random.choice(list(all_memories)) - return selected_memory.title, selected_memory.chat_id or "" - - except Exception as e: - logger.error(f"[记忆管理] 获取随机记忆标题时发生错误: {e}") - return "", "" - - def _delete_original_memories(self, related_titles: List[str]) -> int: - """按标题删除原始记忆""" - try: - deleted_count = 0 - # 删除相关记忆(通过标题匹配) - for title in related_titles: - try: - # 通过标题查找并删除对应的记忆 - memories_to_delete = MemoryChestModel.select().where(MemoryChestModel.title == title) - for memory in memories_to_delete: - MemoryChestModel.delete().where(MemoryChestModel.id == memory.id).execute() - deleted_count += 1 - logger.debug(f"[记忆管理] 删除相关记忆: {memory.title}") - except Exception as e: - logger.error(f"[记忆管理] 删除相关记忆时出错: {e}") - continue - - return deleted_count - - except Exception as e: - logger.error(f"[记忆管理] 删除原始记忆时发生错误: {e}") - return 0 - - -class MemoryConflictCleanupTask(AsyncTask): - """记忆冲突清理定时任务 - - 定期清理 memory_conflicts 表中 create_time 较早(7天前)且 answer 为空的项目 - 默认每小时执行一次 - """ - - def __init__(self, cleanup_days: int = 7, run_interval: int = 3600): - """ - 初始化清理任务 - - Args: - cleanup_days: 清理多少天前的记录,默认7天 - run_interval: 执行间隔(秒),默认3600秒(1小时) - """ - super().__init__( - task_name="Memory Conflict Cleanup Task", - wait_before_start=60, # 启动后等待60秒再开始 - run_interval=run_interval - ) - self.cleanup_days = cleanup_days - - async def run(self): - """执行清理任务""" - try: - current_time = time.time() - # 计算7天前的时间戳 - cutoff_time = current_time - (self.cleanup_days * 24 * 60 * 60) - - logger.info(f"[冲突清理] 开始清理 {self.cleanup_days} 天前且 answer 为空的冲突记录(截止时间: {cutoff_time})") - - # 查询需要清理的记录:create_time < cutoff_time 且 answer 为空 - # answer 为空的条件:answer IS NULL 或 answer == '' - query = MemoryConflict.select().where( - (MemoryConflict.create_time < cutoff_time) & - ((MemoryConflict.answer.is_null()) | (MemoryConflict.answer == '')) - ) - - # 先统计要删除的数量 - deleted_count = query.count() - - # 批量删除 - if deleted_count > 0: - deleted = MemoryConflict.delete().where( - (MemoryConflict.create_time < cutoff_time) & - ((MemoryConflict.answer.is_null()) | (MemoryConflict.answer == '')) - ).execute() - deleted_count = deleted - - if deleted_count > 0: - logger.info(f"[冲突清理] 成功清理 {deleted_count} 条过期且未回答的冲突记录") - else: - logger.debug("[冲突清理] 没有需要清理的记录") - - except Exception as e: - logger.error(f"[冲突清理] 执行清理任务时发生错误: {e}", exc_info=True) diff --git a/src/memory_system/questions.py b/src/memory_system/questions.py index c578af90..af4fb344 100644 --- a/src/memory_system/questions.py +++ b/src/memory_system/questions.py @@ -1,120 +1,12 @@ import time -import asyncio from src.common.logger import get_logger from src.common.database.database_model import MemoryConflict -from src.chat.utils.chat_message_builder import ( - get_raw_msg_by_timestamp_with_chat, - build_readable_messages, -) from src.llm_models.utils_model import LLMRequest -from src.config.config import model_config, global_config -from typing import List +from src.config.config import model_config from src.memory_system.memory_utils import parse_md_json logger = get_logger("conflict_tracker") -class QuestionTracker: - """ - 用于跟踪一个问题在后续聊天中的解答情况 - """ - - def __init__(self, question: str, chat_id: str, context: str = "") -> None: - self.question = question - self.chat_id = chat_id - now = time.time() - self.context = context - self.start_time = now - self.last_read_time = now - self.last_judge_time = now # 上次判定的时间 - self.judge_debounce_interval = 10.0 # 判定防抖间隔:10秒 - self.consecutive_end_count = 0 # 连续END计数 - self.active = True - # 将 LLM 实例作为类属性,使用 utils 模型 - self.llm_request = LLMRequest(model_set=model_config.model_task_config.utils, request_type="conflict.judge") - - def stop(self) -> None: - self.active = False - - def should_judge_now(self) -> bool: - """ - 检查是否应该进行判定(防抖检查) - - Returns: - bool: 是否可以判定 - """ - now = time.time() - # 检查是否已经过了10秒的防抖间隔 - return (now - self.last_judge_time) >= self.judge_debounce_interval - - def __eq__(self, other) -> bool: - """比较两个追踪器是否相等(基于问题内容和聊天ID)""" - if not isinstance(other, QuestionTracker): - return False - return self.question == other.question and self.chat_id == other.chat_id - - def __hash__(self) -> int: - """为对象提供哈希值,支持集合操作""" - return hash((self.question, self.chat_id)) - - async def judge_answer(self, conversation_text: str,chat_len: int) -> tuple[bool, str, str]: - """ - 使用模型判定问题是否已得到解答。 - - Returns: - tuple[bool, str, str]: (是否结束跟踪, 结束原因或答案, 判定类型) - - True: 结束跟踪(已解答、话题转向等) - - False: 继续跟踪 - 判定类型: "ANSWERED", "END", "CONTINUE" - """ - - end_prompt = "" - if chat_len > 20: - end_prompt = "\n- 如果最新20条聊天记录内容与问题无关,话题已转向其他方向,请只输出:END" - - prompt = f"""你是一个严谨的判定器。下面给出聊天记录以及一个问题。 -任务:判断在这段聊天中,该问题是否已经得到明确解答。 -**你必须严格按照聊天记录的内容,不要添加额外的信息** - -输出规则: -- 如果聊天记录内容的信息已解答问题,请只输出:YES: <简短答案>{end_prompt} -- 如果问题尚未解答但聊天仍在相关话题上,请只输出:NO - -**问题** -{self.question} - - -**聊天记录** -{conversation_text} -""" - - if global_config.debug.show_prompt: - logger.info(f"判定提示词: {prompt}") - else: - logger.debug("已发送判定提示词") - - result_text, _ = await self.llm_request.generate_response_async(prompt, temperature=0.5) - - logger.info(f"判定结果: {prompt}\n{result_text}") - - # 更新上次判定时间 - self.last_judge_time = time.time() - - if not result_text: - return False, "", "CONTINUE" - - text = result_text.strip() - if text.upper().startswith("YES:"): - answer = text[4:].strip() - return True, answer, "ANSWERED" - if text.upper().startswith("YES"): - # 兼容仅输出 YES 或 YES - answer = text[3:].strip().lstrip(":").strip() - return True, answer, "ANSWERED" - if text.upper().startswith("END"): - # 聊天内容与问题无关,放弃该问题思考 - return True, "话题已转向其他方向,放弃该问题思考", "END" - return False, "", "CONTINUE" - class ConflictTracker: """ 记忆整合冲突追踪器 @@ -122,31 +14,19 @@ class ConflictTracker: 用于记录和存储记忆整合过程中的冲突内容 """ def __init__(self): - self.question_tracker_list:List[QuestionTracker] = [] - self.LLMRequest_tracker = LLMRequest( model_set=model_config.model_task_config.utils, request_type="conflict_tracker", ) - - def get_questions_by_chat_id(self, chat_id: str) -> List[QuestionTracker]: - return [tracker for tracker in self.question_tracker_list if tracker.chat_id == chat_id] - - async def track_conflict(self, question: str, context: str = "",start_following: bool = False,chat_id: str = "") -> bool: - """ - 跟踪冲突内容 - """ - tracker = QuestionTracker(question.strip(), chat_id, context) - self.question_tracker_list.append(tracker) - asyncio.create_task(self._follow_and_record(tracker, question.strip())) - return True - async def record_conflict(self, conflict_content: str, context: str = "",start_following: bool = False,chat_id: str = "") -> bool: + async def record_conflict(self, conflict_content: str, context: str = "", chat_id: str = "") -> bool: """ 记录冲突内容 - Args:k + Args: conflict_content: 冲突内容 + context: 上下文 + chat_id: 聊天ID Returns: bool: 是否成功记录 @@ -155,15 +35,7 @@ class ConflictTracker: if not conflict_content or conflict_content.strip() == "": return False - # 若需要跟随后续消息以判断是否得到解答,则进入跟踪流程 - if start_following and chat_id: - tracker = QuestionTracker(conflict_content.strip(), chat_id, context) - self.question_tracker_list.append(tracker) - # 后台启动跟踪任务,避免阻塞 - asyncio.create_task(self._follow_and_record(tracker, conflict_content.strip())) - return True - - # 默认:直接记录,不进行跟踪 + # 直接记录,不进行跟踪 MemoryConflict.create( conflict_content=conflict_content, create_time=time.time(), @@ -179,164 +51,6 @@ class ConflictTracker: logger.error(f"记录冲突内容时出错: {e}") return False - async def _follow_and_record(self, tracker: QuestionTracker, original_question: str) -> None: - """ - 后台任务:跟踪问题是否被解答,并写入数据库。 - """ - try: - max_duration = 10 * 60 # 30 分钟 - max_messages = 50 # 最多 100 条消息 - poll_interval = 2.0 # 秒 - logger.info(f"开始跟踪问题: {original_question}") - while tracker.active: - now_ts = time.time() - # 终止条件:时长达到上限 - if now_ts - tracker.start_time >= max_duration: - logger.info("问题跟踪达到10分钟上限,判定为未解答") - break - - # 统计最近一段是否有新消息(不过滤机器人,过滤命令) - recent_msgs = get_raw_msg_by_timestamp_with_chat( - chat_id=tracker.chat_id, - timestamp_start=tracker.last_read_time, - timestamp_end=now_ts, - limit=30, - limit_mode="latest", - filter_bot=False, - filter_command=True, - ) - - if len(recent_msgs) > 0: - tracker.last_read_time = now_ts - - # 统计从开始到现在的总消息数(用于触发100条上限) - all_msgs = get_raw_msg_by_timestamp_with_chat( - chat_id=tracker.chat_id, - timestamp_start=tracker.start_time, - timestamp_end=now_ts, - limit=0, - limit_mode="latest", - filter_bot=False, - filter_command=True, - ) - - # 检查是否应该进行判定(防抖检查) - if not tracker.should_judge_now(): - logger.debug(f"判定防抖中,跳过本次判定: {tracker.question}") - await asyncio.sleep(poll_interval) - continue - - # 构建可读聊天文本 - chat_text = build_readable_messages( - all_msgs, - replace_bot_name=True, - timestamp_mode="relative", - read_mark=0.0, - truncate=False, - show_actions=False, - show_pic=False, - remove_emoji_stickers=True, - ) - chat_len = len(all_msgs) - # 让小模型判断是否有答案 - answered, answer_text, judge_type = await tracker.judge_answer(chat_text,chat_len) - - if judge_type == "ANSWERED": - # 问题已解答,直接结束跟踪 - logger.info("问题已得到解答,结束跟踪并写入答案") - await self.add_or_update_conflict( - conflict_content=tracker.question, - create_time=tracker.start_time, - update_time=time.time(), - answer=answer_text or "", - chat_id=tracker.chat_id, - ) - return - elif judge_type == "END": - # 话题转向,增加END计数 - tracker.consecutive_end_count += 1 - logger.info(f"话题已转向,连续END次数: {tracker.consecutive_end_count}") - - if tracker.consecutive_end_count >= 2: - # 连续两次END,结束跟踪 - logger.info("连续两次END,结束跟踪") - break - else: - # 第一次END,重置计数器并继续跟踪 - logger.info("第一次END,继续跟踪") - continue - elif judge_type == "CONTINUE": - # 继续跟踪,重置END计数器 - tracker.consecutive_end_count = 0 - continue - - if len(all_msgs) >= max_messages: - logger.info("问题跟踪达到100条消息上限,判定为未解答") - logger.info(f"追踪结束:{tracker.question}") - break - - # 无新消息时稍作等待 - await asyncio.sleep(poll_interval) - - # 未获取到答案,检查是否需要删除记录 - # 查找现有的冲突记录 - existing_conflict = MemoryConflict.get_or_none( - MemoryConflict.conflict_content == original_question, - MemoryConflict.chat_id == tracker.chat_id - ) - - if existing_conflict: - # 检查raise_time是否大于3且没有答案 - current_raise_time = getattr(existing_conflict, "raise_time", 0) or 0 - if current_raise_time > 0 and not existing_conflict.answer: - # 删除该条目 - await self.delete_conflict(original_question, tracker.chat_id) - logger.info(f"追踪结束后删除条目(raise_time={current_raise_time}且无答案): {original_question}") - else: - # 更新记录但不删除 - await self.add_or_update_conflict( - conflict_content=original_question, - create_time=existing_conflict.create_time, - update_time=time.time(), - answer="", - chat_id=tracker.chat_id, - ) - logger.info(f"记录冲突内容(未解答): {len(original_question)} 字符") - else: - # 如果没有现有记录,创建新记录 - await self.add_or_update_conflict( - conflict_content=original_question, - create_time=time.time(), - update_time=time.time(), - answer="", - chat_id=tracker.chat_id, - ) - logger.info(f"记录冲突内容(未解答): {len(original_question)} 字符") - - logger.info(f"问题跟踪结束:{original_question}") - except Exception as e: - logger.error(f"后台问题跟踪任务异常: {e}") - finally: - # 无论任务成功还是失败,都要从追踪列表中移除 - tracker.stop() - self.remove_tracker(tracker) - - def remove_tracker(self, tracker: QuestionTracker) -> None: - """ - 从追踪列表中移除指定的追踪器 - - Args: - tracker: 要移除的追踪器对象 - """ - try: - if tracker in self.question_tracker_list: - self.question_tracker_list.remove(tracker) - logger.info(f"已从追踪列表中移除追踪器: {tracker.question}") - else: - logger.warning(f"尝试移除不存在的追踪器: {tracker.question}") - except Exception as e: - logger.error(f"移除追踪器时出错: {e}") - async def add_or_update_conflict( self, conflict_content: str, @@ -429,7 +143,6 @@ class ConflictTracker: await self.record_conflict( conflict_content=question["question"], context=reasoning_content, - start_following=False, chat_id=chat_id, ) return True