diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 66dbbf7b..9f4bcfa0 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: conversation.py import time import asyncio import datetime @@ -9,7 +11,7 @@ from src.plugins.utils.chat_message_builder import build_readable_messages, get_ from typing import Dict, Any, Optional from ..chat.message import Message from .pfc_types import ConversationState -from .pfc import ChatObserver, GoalAnalyzer +from .pfc import ChatObserver, GoalAnalyzer # pfc.py 包含了 GoalAnalyzer,无需重复导入 from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner @@ -19,7 +21,7 @@ from .reply_generator import ReplyGenerator from ..chat.chat_stream import ChatStream from maim_message import UserInfo from src.plugins.chat.chat_stream import chat_manager -from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 注意:这里是 PFC_KnowledgeFetcher.py from .waiter import Waiter import traceback @@ -114,6 +116,10 @@ class Conversation: ) # 让 ChatObserver 从加载的最后一条消息之后开始同步 + # **** 注意:这里的 last_message_time 设置可能需要 review **** + # 如果数据库消息时间戳可能不完全连续,直接设置 last_message_time 可能导致 observer 错过消息 + # 更稳妥的方式是让 observer 自己管理其内部的 last_message_time 或 last_message_id + # 暂时保留,但标记为潜在问题点。如果 observer 逻辑是可靠的,则此行 OK。 self.chat_observer.last_message_time = self.observation_info.last_message_time self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录 else: @@ -138,7 +144,7 @@ class Conversation: async def _plan_and_action_loop(self): """思考步,PFC核心循环模块""" while self.should_continue: - # 忽略逻辑 + # 忽略逻辑 (保持不变) if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: await asyncio.sleep(30) continue @@ -147,64 +153,92 @@ class Conversation: self.ignore_until_timestamp = None self.should_continue = False continue + try: - # --- 在规划前记录当前新消息数量 --- - initial_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - initial_new_message_count = self.observation_info.new_messages_count + 1 # 算上麦麦自己发的那一条 + # --- [修改点 1] 在规划前记录新消息状态 --- + # 记录规划开始时未处理消息的 ID 集合,用于后续判断哪些是新来的 + message_ids_before_planning = set() + initial_unprocessed_message_count = 0 # <-- 新增:记录规划前的未处理消息数 + if hasattr(self.observation_info, "unprocessed_messages"): + message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")} + initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) # <-- 获取初始数量 + logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}") else: logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' before planning." + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning." ) - # --- 调用 Action Planner --- - # 传递 self.conversation_info.last_successful_reply_action + + # --- 调用 Action Planner (保持不变) --- action, reason = await self.action_planner.plan( self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) - # --- 规划后检查是否有 *更多* 新消息到达 --- - current_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - current_new_message_count = self.observation_info.new_messages_count + # --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 --- + current_unprocessed_messages = [] + current_unprocessed_message_count = 0 + if hasattr(self.observation_info, "unprocessed_messages"): + current_unprocessed_messages = self.observation_info.unprocessed_messages + current_unprocessed_message_count = len(current_unprocessed_messages) # <-- 获取当前数量 else: logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' after planning." + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning." ) - if current_new_message_count > initial_new_message_count + 2: + # 计算规划期间实际新增的消息数量 + new_messages_during_planning_count = 0 + new_message_ids_during_planning = set() + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_planning_count += 1 + new_message_ids_during_planning.add(msg_id) + + logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}") + + # **核心逻辑:判断是否中断** + # 这里的 +2 是根据你的需求来的,代表允许的缓冲 + # 我们比较的是 *规划期间新增的消息数* 是否超过阈值 + if new_messages_during_planning_count > 2: logger.info( - f"[私聊][{self.private_name}]规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划" + f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划" ) - # 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了 + # 中断时,重置上次回复状态,因为需要基于最新消息重新决策 self.conversation_info.last_successful_reply_action = None - await asyncio.sleep(0.1) - continue + # **重要**: 中断时不清空未处理消息,留给下一轮规划处理 + await asyncio.sleep(0.1) # 短暂暂停避免CPU空转 + continue # 跳过本轮后续处理,直接进入下一轮循环重新规划 - # 包含 send_new_message - if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]: - if hasattr(self.observation_info, "clear_unprocessed_messages"): - logger.debug( - f"[私聊][{self.private_name}]准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。" - ) - await self.observation_info.clear_unprocessed_messages() - if hasattr(self.observation_info, "new_messages_count"): - self.observation_info.new_messages_count = 0 - else: - logger.error( - f"[私聊][{self.private_name}]无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!" - ) + # --- [修改点 3] 准备执行动作,处理规划时已知的消息 --- + # 如果决定要回复 (direct_reply 或 send_new_message),并且规划开始时就有未处理消息 + # 这表示 LLM 规划时已经看到了这些消息 + # 我们需要在发送回复 *后* 清理掉这些规划时已知的消息 + # 注意:这里不再立即清理,清理逻辑移到 _handle_action 成功发送后 + messages_known_during_planning = [] + if action in ["direct_reply", "send_new_message"] and initial_unprocessed_message_count > 0: + messages_known_during_planning = [ + msg for msg_id in message_ids_before_planning + if (msg := next((m for m in self.observation_info.unprocessed_messages if m.get("message_id") == msg_id), None)) is not None + ] + logger.debug(f"[私聊][{self.private_name}]规划时已知 {len(messages_known_during_planning)} 条消息,将在回复成功后清理。") - await self._handle_action(action, reason, self.observation_info, self.conversation_info) - # 检查是否需要结束对话 (逻辑不变) + # --- 执行动作 --- + # 将规划时已知需要清理的消息ID集合传递给 _handle_action + await self._handle_action(action, reason, self.observation_info, self.conversation_info, message_ids_before_planning) + + # --- 检查是否需要结束对话 (逻辑保持不变) --- goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: for goal_item in self.conversation_info.goal_list: + current_goal = None # 初始化 current_goal if isinstance(goal_item, dict): current_goal = goal_item.get("goal") + elif isinstance(goal_item, str): # 处理直接是字符串的情况 + current_goal = goal_item - if current_goal == "结束对话": + # 确保 current_goal 是字符串再比较 + if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True break @@ -215,38 +249,49 @@ class Conversation: except Exception as loop_err: logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - await asyncio.sleep(1) + await asyncio.sleep(1) # 出错时暂停一下 + # 循环间隔 if self.should_continue: - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # 非阻塞的短暂停 logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") - def _check_new_messages_after_planning(self): - """检查在规划后是否有新消息""" - # 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性 - if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "new_messages_count"): - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。" - ) - return False # 或者根据需要抛出错误 - if self.observation_info.new_messages_count > 2: - logger.info( - f"[私聊][{self.private_name}]生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划" + # --- [修改点 4] 修改 _check_new_messages_after_planning --- + # 重命名并修改逻辑,用于在 *发送前* 检查是否有过多新消息(兜底检查) + def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool: + """在发送回复前,最后检查一次是否有过多新消息导致需要中断""" + if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "unprocessed_messages"): + logger.warning( + f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'unprocessed_messages' 属性,无法检查新消息。" ) - # 如果有新消息,也应该重置上次回复状态 - if hasattr(self, "conversation_info"): # 确保 conversation_info 已初始化 + return False + + current_unprocessed_messages = self.observation_info.unprocessed_messages + new_messages_count = 0 + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_count += 1 + + # 使用与规划后检查相同的阈值 + if new_messages_count > 2: + logger.info( + f"[私聊][{self.private_name}]准备发送时发现新增消息数 ({new_messages_count}) 超过阈值(2),取消发送并重新规划" + ) + if hasattr(self, "conversation_info"): self.conversation_info.last_successful_reply_action = None else: logger.warning( f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" ) - return True - return False + return True # 需要中断 + return False # 不需要中断 + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象""" + """将消息字典转换为Message对象 (保持不变)""" try: # 尝试从 msg_dict 直接获取 chat_stream,如果失败则从全局 chat_manager 获取 chat_info = msg_dict.get("chat_info") @@ -274,8 +319,14 @@ class Conversation: # 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题 raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e + # --- [修改点 5] 修改 _handle_action 签名并调整内部逻辑 --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo + self, + action: str, + reason: str, + observation_info: ObservationInfo, + conversation_info: ConversationInfo, + message_ids_before_planning: set # <-- 接收规划前的消息ID集合 ): """处理规划的行动""" @@ -289,18 +340,17 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), "final_reason": None, } - # 确保 done_action 列表存在 if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) action_index = len(conversation_info.done_action) - 1 - action_successful = False # 用于标记动作是否成功完成 + action_successful = False + reply_sent = False # <-- 新增:标记是否成功发送了回复 # --- 根据不同的 action 执行 --- - - # send_new_message 失败后执行 wait - if action == "send_new_message": + if action == "direct_reply" or action == "send_new_message": + # 合并 direct_reply 和 send_new_message 的大部分逻辑 max_reply_attempts = 3 reply_attempt_count = 0 is_suitable = False @@ -310,248 +360,170 @@ class Conversation: while reply_attempt_count < max_reply_attempts and not is_suitable: reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) + log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." + logger.info(log_prefix) self.state = ConversationState.GENERATING - # 1. 生成回复 (调用 generate 时传入 action_type) + # 1. 生成回复 (传入 action_type) self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="send_new_message" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" + observation_info, conversation_info, action_type=action ) + logger.info(f"{log_prefix} 生成内容: {self.generated_reply}") # 2. 检查回复 (逻辑不变) self.state = ConversationState.CHECKING try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" + current_goal_str = "" # 初始化 + if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list: + goal_item = conversation_info.goal_list[0] # 取第一个目标 + if isinstance(goal_item, dict): + current_goal_str = goal_item.get('goal', '') + elif isinstance(goal_item, str): + current_goal_str = goal_item + + # 确保 chat_history 和 chat_history_str 存在 + chat_history_for_check = getattr(observation_info, 'chat_history', []) + chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') + is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, + chat_history=chat_history_for_check, + chat_history_str=chat_history_str_for_check, retry_count=reply_attempt_count - 1, ) logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" + f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + + # 更新拒绝原因和内容 (仅在不合适或需要重规划时) if not is_suitable or need_replan: conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = self.generated_reply else: - # 如果检查通过,清空上次的拒绝原因 + # 检查通过,清空上次拒绝记录 conversation_info.last_reply_rejection_reason = None conversation_info.last_rejected_reply_content = None if is_suitable: final_reply_to_send = self.generated_reply - break + break # 检查通过,跳出循环 elif need_replan: logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" + f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}" ) - break + break # 需要重新规划,跳出循环 except Exception as check_err: logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" + f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" # 出错也记录原因 - conversation_info.last_rejected_reply_content = self.generated_reply - break + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" + conversation_info.last_rejected_reply_content = self.generated_reply # 记录出错时尝试的内容 + break # 检查出错,跳出循环 - # 循环结束,处理最终结果 + # --- 处理生成和检查的结果 --- if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动") + # --- [修改点 6] 发送前最后检查是否需要中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]生成回复后、发送前发现过多新消息,取消发送,重新规划") conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"} + {"status": "recall", "final_reason": f"发送前发现过多新消息,取消发送: {final_reply_to_send}"} ) - return # 直接返回,重新规划 + self.conversation_info.last_successful_reply_action = None # 重置状态 + return # 直接返回,主循环会重新规划 - # 发送合适的回复 + # 确认发送 self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 - # --- 新增:回复成功,清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - # 更新状态: 标记上次成功是 send_new_message - self.conversation_info.last_successful_reply_action = "send_new_message" - action_successful = True # 标记动作成功 + send_success = await self._send_reply() # 调用发送函数 + + if send_success: + action_successful = True + reply_sent = True # 标记回复已发送 + logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.") + # 清空上次拒绝记录 (再次确保) + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + # --- [修改点 7] 发送成功后,处理新消息并决定下一轮 prompt 类型 --- + # 获取发送后的最新未处理消息列表 + final_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + final_unprocessed_count = len(final_unprocessed_messages) + + # 计算在生成和发送期间新增的消息数 + new_messages_during_generation_count = 0 + for msg in final_unprocessed_messages: + msg_id = msg.get("message_id") + # 如果消息 ID 不在规划前的集合中,说明是新来的 + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_generation_count += 1 + + logger.debug(f"[私聊][{self.private_name}]回复发送后,当前未处理消息数: {final_unprocessed_count}, 其中生成/发送期间新增: {new_messages_during_generation_count}") + + # 根据生成期间是否有新消息,决定下次规划用哪个 prompt + if new_messages_during_generation_count > 0: + # 有 1 条或更多新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_generation_count} 条在生成/发送期间到达的新消息,下一轮将使用首次回复逻辑处理。") + self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY + else: + # 没有新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]生成/发送期间无新消息,下一轮将根据 '{action}' 使用追问逻辑。") + self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP + + # --- [修改点 8] 清理规划时已知的消息 --- + # 只有在回复成功发送后,才清理掉那些在规划时就已经看到的消息 + if message_ids_before_planning: + await observation_info.clear_processed_messages(message_ids_before_planning) + + + else: # 发送失败 + logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。") + # 发送失败,也认为动作未成功,重置状态 + action_successful = False + self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送回复时失败"} + ) elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} - ) + # 检查后决定打回动作决策 + logger.warning( + f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - else: - # 追问失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 追问失败,下次用初始 prompt - self.conversation_info.last_successful_reply_action = None + else: # 多次尝试后仍然不合适 (is_suitable is False and not need_replan) + logger.warning( + f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - # 执行 Wait 操作 - logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - - elif action == "direct_reply": - max_reply_attempts = 3 - reply_attempt_count = 0 - is_suitable = False - need_replan = False - check_reason = "未进行尝试" - final_reply_to_send = "" - - while reply_attempt_count < max_reply_attempts and not is_suitable: - reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) - self.state = ConversationState.GENERATING - - # 1. 生成回复 - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="direct_reply" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" - ) - - # 2. 检查回复 - self.state = ConversationState.CHECKING - try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" - is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( - reply=self.generated_reply, - goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, - retry_count=reply_attempt_count - 1, - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" - ) - if is_suitable: - final_reply_to_send = self.generated_reply - break - elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" - ) - break - except Exception as check_err: - logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" - ) - check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - break - - # 循环结束,处理最终结果 - if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成首次回复期间收到新消息,取消发送,重新规划行动") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"} - ) - return # 直接返回,重新规划 - - # 发送合适的回复 - self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 - # --- 新增:回复成功,清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 - # 更新状态: 标记上次成功是 direct_reply - self.conversation_info.last_successful_reply_action = "direct_reply" - action_successful = True # 标记动作成功 - - elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} - ) - - else: - # 首次回复失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 首次回复失败,下次还是用初始 prompt - self.conversation_info.last_successful_reply_action = None - - # 执行 Wait 操作 (保持原有逻辑) - logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - - # elif action == "fetch_knowledge": - # self.state = ConversationState.FETCHING - # knowledge_query = reason - # try: - # 检查 knowledge_fetcher 是否存在 - # if not hasattr(self, "knowledge_fetcher"): - # logger.error(f"[私聊][{self.private_name}]KnowledgeFetcher 未初始化,无法获取知识。") - # raise AttributeError("KnowledgeFetcher not initialized") - - # knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history) - # logger.info(f"[私聊][{self.private_name}]获取到知识: {knowledge[:100]}..., 来源: {source}") - # if knowledge: - # 确保 knowledge_list 存在 - # if not hasattr(conversation_info, "knowledge_list"): - # conversation_info.knowledge_list = [] - # conversation_info.knowledge_list.append( - # {"query": knowledge_query, "knowledge": knowledge, "source": source} - # ) - # action_successful = True - # except Exception as fetch_err: - # logger.error(f"[私聊][{self.private_name}]获取知识时出错: {str(fetch_err)}") - # conversation_info.done_action[action_index].update( - # {"status": "recall", "final_reason": f"获取知识失败: {str(fetch_err)}"} - # ) - # self.conversation_info.last_successful_reply_action = None # 重置状态 + # 如果是 send_new_message 失败,则执行 wait (保持原 fallback 逻辑) + if action == "send_new_message": + logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") + self.state = ConversationState.WAITING + await self.waiter.wait(self.conversation_info) + wait_action_record = { + "action": "wait", + "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", + "status": "done", # wait 本身算完成 + "time": datetime.datetime.now().strftime("%H:%M:%S"), + "final_reason": None, + } + conversation_info.done_action.append(wait_action_record) + action_successful = True # fallback wait 成功 + # 注意: fallback wait 成功后,last_successful_reply_action 仍然是 None + # --- 处理其他动作 (保持大部分不变,主要是确保状态重置) --- elif action == "rethink_goal": self.state = ConversationState.RETHINKING try: - # 检查 goal_analyzer 是否存在 if not hasattr(self, "goal_analyzer"): logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。") raise AttributeError("GoalAnalyzer not initialized") @@ -562,67 +534,99 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 last_successful_reply_action + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None # 清除拒绝原因 + conversation_info.last_rejected_reply_content = None + elif action == "listening": self.state = ConversationState.LISTENING logger.info(f"[私聊][{self.private_name}]倾听对方发言...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。") raise AttributeError("Waiter not initialized") await self.waiter.wait_listening(conversation_info) - action_successful = True # Listening 完成就算成功 + action_successful = True except Exception as listen_err: logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None elif action == "say_goodbye": - self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING + self.state = ConversationState.GENERATING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: - # 1. 生成告别语 (使用 'say_goodbye' action_type) + # 1. 生成告别语 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" ) logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") - # 2. 直接发送告别语 (不经过检查) - if self.generated_reply: # 确保生成了内容 - await self._send_reply() # 调用发送方法 - # 发送成功后,标记动作成功 - action_successful = True - logger.info(f"[私聊][{self.private_name}]告别语已发送。") + # 2. 发送告别语 + if self.generated_reply: + # --- [修改点 9] 告别前也检查中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]发送告别语前发现过多新消息,取消发送,重新规划") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语前发现过多新消息"} + ) + self.should_continue = True # 不能结束,需要重规划 + self.conversation_info.last_successful_reply_action = None # 重置状态 + return + + send_success = await self._send_reply() + if send_success: + action_successful = True + reply_sent = True # 标记发送成功 + logger.info(f"[私聊][{self.private_name}]告别语已发送。") + # 发送告别语成功后,通常意味着对话结束 + self.should_continue = False + logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + else: + logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") + action_successful = False + # 发送失败不应结束对话,可能需要重试或做其他事 + self.should_continue = True + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语失败"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 + else: logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True # 未能生成也不能结束 conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": "未能生成告别语内容"} ) - - # 3. 无论是否发送成功,都准备结束对话 - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + self.conversation_info.last_successful_reply_action = None except Exception as goodbye_err: logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - # 即使出错,也结束对话 - self.should_continue = False - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True # 出错也不能结束 conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} ) + self.conversation_info.last_successful_reply_action = None elif action == "end_conversation": - # 这个分支现在只会在 action_planner 最终决定不告别时被调用 self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - action_successful = True # 标记这个指令本身是成功的 + action_successful = True + # 结束对话也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}]不想再理你了...") @@ -632,27 +636,34 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED - action_successful = True # 标记动作成功 + action_successful = True + # 忽略也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + else: # 对应 'wait' 动作 self.state = ConversationState.WAITING logger.info(f"[私聊][{self.private_name}]等待更多信息...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。") raise AttributeError("Waiter not initialized") _timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True # Wait 完成就算成功 + action_successful = True except Exception as wait_err: logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"等待失败: {wait_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + # --- 更新 Action History 状态 --- - # 只有当动作本身成功时,才更新状态为 done if action_successful: conversation_info.done_action[action_index].update( { @@ -660,51 +671,50 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), } ) - # 重置状态: 对于非回复类动作的成功,清除上次回复状态 - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None - # --- 新增:非回复动作成功,也清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 - logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") - # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action + # **注意**: last_successful_reply_action 的更新逻辑已经移到各自的动作处理中 + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'") + else: + # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action 的 final_reason + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败") - async def _send_reply(self): - """发送回复""" + # --- [修改点 10] _send_reply 返回布尔值表示成功与否 --- + async def _send_reply(self) -> bool: + """发送回复,并返回是否发送成功""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return + return False # 发送失败 try: - _current_time = time.time() reply_content = self.generated_reply - # 发送消息 (确保 direct_sender 和 chat_stream 有效) + # 检查依赖项 if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return + return False # 发送失败 if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return + return False # 发送失败 + # 发送消息 await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) - # 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息 - # 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持) - # 暂时注释掉,观察是否影响 ObservationInfo 的更新 + # 发送成功后,可以考虑触发 observer 更新,但需谨慎避免竞争条件或重复处理 + # 暂时注释掉,依赖 observer 的自然更新周期 # self.chat_observer.trigger_update() - # if not await self.chat_observer.wait_for_update(): - # logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时") + # await self.chat_observer.wait_for_update() - self.state = ConversationState.ANALYZING # 更新状态 + self.state = ConversationState.ANALYZING # 更新状态 (例如,可以改为 IDLE 或 WAITING) + return True # 发送成功 except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - self.state = ConversationState.ANALYZING + self.state = ConversationState.ANALYZING # 或者设置为 ERROR 状态? + return False # 发送失败 + async def _send_timeout_message(self): - """发送超时结束消息""" + """发送超时结束消息 (保持不变)""" try: messages = self.chat_observer.get_cached_messages(limit=1) if not messages: @@ -715,4 +725,4 @@ class Conversation: chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message ) except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") \ No newline at end of file diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index c7572955..35c63741 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: observation_info.py from typing import List, Optional, Dict, Any, Set from maim_message import UserInfo import time @@ -95,8 +97,11 @@ class ObservationInfoHandler(NotificationHandler): self.observation_info.unprocessed_messages = [ msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id ] - if len(self.observation_info.unprocessed_messages) < original_count: - logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})") + # --- [修改点 11] 更新 new_messages_count --- + self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) + if self.observation_info.new_messages_count < original_count: + logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}") + elif notification_type == NotificationType.USER_JOINED: # 处理用户加入通知 (如果适用私聊场景) @@ -168,7 +173,7 @@ class ObservationInfo: self.last_message_id: Optional[str] = None self.last_message_content: str = "" self.last_message_sender: Optional[str] = None - self.bot_id: Optional[str] = None + self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id,例如从 global_config 获取 self.chat_history_count: int = 0 self.new_messages_count: int = 0 self.cold_chat_start_time: Optional[float] = None @@ -184,44 +189,43 @@ class ObservationInfo: self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name) - def bind_to_chat_observer(self, chat_observer: ChatObserver): - """绑定到指定的chat_observer + # --- 初始化 bot_id --- + from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题 + self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None - Args: - chat_observer: 要绑定的 ChatObserver 实例 - """ + def bind_to_chat_observer(self, chat_observer: ChatObserver): + """绑定到指定的chat_observer (保持不变)""" if self.chat_observer: logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver") return self.chat_observer = chat_observer try: - if not self.handler: # 确保 handler 已经被创建 + if not self.handler: logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!") - self.chat_observer = None # 重置,防止后续错误 + self.chat_observer = None return - # 注册关心的通知类型 self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler ) self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # 可以根据需要注册更多通知类型 - # self.chat_observer.notification_manager.register_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- [修改点 12] 注册 MESSAGE_DELETED --- + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") except Exception as e: logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") - self.chat_observer = None # 绑定失败,重置 + self.chat_observer = None def unbind_from_chat_observer(self): - """解除与chat_observer的绑定""" + """解除与chat_observer的绑定 (保持不变)""" if ( self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler - ): # 增加 handler 检查 + ): try: self.chat_observer.notification_manager.unregister_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler @@ -229,161 +233,152 @@ class ObservationInfo: self.chat_observer.notification_manager.unregister_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # 如果注册了其他类型,也要在这里注销 - # self.chat_observer.notification_manager.unregister_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- [修改点 13] 注销 MESSAGE_DELETED --- + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") except Exception as e: logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") - finally: # 确保 chat_observer 被重置 + finally: self.chat_observer = None else: logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置") - # 修改:update_from_message 接收 UserInfo 对象 async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): - """从消息更新信息 - - Args: - message: 消息数据字典 - user_info: 解析后的 UserInfo 对象 (可能为 None) - """ + """从消息更新信息 (保持不变)""" message_time = message.get("time") message_id = message.get("message_id") processed_text = message.get("processed_plain_text", "") - # 只有在新消息到达时才更新 last_message 相关信息 if message_time and message_time > (self.last_message_time or 0): self.last_message_time = message_time self.last_message_id = message_id self.last_message_content = processed_text - # 重置冷场计时器 self.is_cold_chat = False self.cold_chat_start_time = None self.cold_chat_duration = 0.0 if user_info: - sender_id = str(user_info.user_id) # 确保是字符串 + sender_id = str(user_info.user_id) self.last_message_sender = sender_id - # 更新发言时间 if sender_id == self.bot_id: self.last_bot_speak_time = message_time else: self.last_user_speak_time = message_time - self.active_users.add(sender_id) # 用户发言则认为其活跃 + self.active_users.add(sender_id) else: logger.warning( f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}" ) - self.last_message_sender = None # 发送者未知 + self.last_message_sender = None - # 将原始消息字典添加到未处理列表 - self.unprocessed_messages.append(message) - self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度 + # --- [修改点 14] 添加到未处理列表,并更新计数 --- + # 检查消息是否已存在于未处理列表中,避免重复添加 + if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): + self.unprocessed_messages.append(message) + self.new_messages_count = len(self.unprocessed_messages) + logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}") + self.update_changed() + else: + logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}") - # logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}") - self.update_changed() # 标记状态已改变 else: - # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告 pass - # logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}") + def update_changed(self): - """标记状态已改变,并重置标记""" - # logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)") + """标记状态已改变,并重置标记 (保持不变)""" self.changed = True async def update_cold_chat_status(self, is_cold: bool, current_time: float): - """更新冷场状态 - - Args: - is_cold: 是否处于冷场状态 - current_time: 当前时间戳 - """ - if is_cold != self.is_cold_chat: # 仅在状态变化时更新 + """更新冷场状态 (保持不变)""" + if is_cold != self.is_cold_chat: self.is_cold_chat = is_cold if is_cold: - # 进入冷场状态 self.cold_chat_start_time = ( self.last_message_time or current_time - ) # 从最后消息时间开始算,或从当前时间开始 + ) logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}") else: - # 结束冷场状态 if self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") - self.cold_chat_start_time = None # 重置开始时间 - self.update_changed() # 状态变化,标记改变 + self.cold_chat_start_time = None + self.update_changed() - # 即使状态没变,如果是冷场状态,也更新持续时间 if self.is_cold_chat and self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time def get_active_duration(self) -> float: - """获取当前活跃时长 (距离最后一条消息的时间) - - Returns: - float: 最后一条消息到现在的时长(秒) - """ + """获取当前活跃时长 (保持不变)""" if not self.last_message_time: return 0.0 return time.time() - self.last_message_time def get_user_response_time(self) -> Optional[float]: - """获取用户最后响应时间 (距离用户最后发言的时间) - - Returns: - Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None - """ + """获取用户最后响应时间 (保持不变)""" if not self.last_user_speak_time: return None return time.time() - self.last_user_speak_time def get_bot_response_time(self) -> Optional[float]: - """获取机器人最后响应时间 (距离机器人最后发言的时间) - - Returns: - Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None - """ + """获取机器人最后响应时间 (保持不变)""" if not self.last_bot_speak_time: return None return time.time() - self.last_bot_speak_time - async def clear_unprocessed_messages(self): - """将未处理消息移入历史记录,并更新相关状态""" - if not self.unprocessed_messages: - return # 没有未处理消息,直接返回 + # --- [修改点 15] 重命名并修改 clear_unprocessed_messages --- + # async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除 + async def clear_processed_messages(self, message_ids_to_clear: Set[str]): + """将指定ID的未处理消息移入历史记录,并更新相关状态""" + if not message_ids_to_clear: + logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。") + return - # logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...") - # 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长) - max_history_len = 100 # 示例:最多保留100条历史记录 - self.chat_history.extend(self.unprocessed_messages) + messages_to_move = [] + remaining_messages = [] + cleared_count = 0 + + # 分离要清理和要保留的消息 + for msg in self.unprocessed_messages: + if msg.get("message_id") in message_ids_to_clear: + messages_to_move.append(msg) + cleared_count += 1 + else: + remaining_messages.append(msg) + + if not messages_to_move: + logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。") + return + + logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...") + + # 将要移动的消息添加到历史记录 + max_history_len = 100 + self.chat_history.extend(messages_to_move) if len(self.chat_history) > max_history_len: self.chat_history = self.chat_history[-max_history_len:] - # 更新历史记录字符串 (只使用最近一部分生成,例如20条) - history_slice_for_str = self.chat_history[-20:] + # 更新历史记录字符串 (仅使用最近一部分生成) + history_slice_for_str = self.chat_history[-20:] # 例如最近20条 try: self.chat_history_str = await build_readable_messages( history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0, # read_mark 可能需要根据逻辑调整 + read_mark=0.0, ) except Exception as e: logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") - self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示 + self.chat_history_str = "[构建聊天记录出错]" - # 清空未处理消息列表和计数 - # cleared_count = len(self.unprocessed_messages) - self.unprocessed_messages.clear() - self.new_messages_count = 0 - # self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断 + # 更新未处理消息列表和计数 + self.unprocessed_messages = remaining_messages + self.new_messages_count = len(self.unprocessed_messages) + self.chat_history_count = len(self.chat_history) - self.chat_history_count = len(self.chat_history) # 更新历史记录总数 - # logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。") + logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") - self.update_changed() # 状态改变 + self.update_changed() # 状态改变 \ No newline at end of file