diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 9a0dd36b..8d3f7d39 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -5,11 +5,14 @@ import datetime # from .message_storage import MongoDBMessageStorage from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat -# from ...config.config import global_config +from ...config.config import global_config # 确保导入 global_config from typing import Dict, Any, Optional from ..chat.message import Message from .pfc_types import ConversationState -from .pfc import ChatObserver, GoalAnalyzer +# 确保导入 ChatObserver 和 GoalAnalyzer (如果 pfc.py 中定义了它们) +# from .pfc import ChatObserver, GoalAnalyzer # 可能需要调整导入路径 +from .chat_observer import ChatObserver # 导入 ChatObserver +from .pfc import GoalAnalyzer # 导入 GoalAnalyzer from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner @@ -48,6 +51,9 @@ class Conversation: # 回复相关 self.generated_reply = "" + # 初始化 bot_id + self.bot_id = str(global_config.BOT_QQ) # 从配置中获取 + async def _initialize(self): """初始化实例,注册所有组件""" @@ -74,6 +80,9 @@ class Conversation: self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) self.chat_observer.start() self.observation_info = ObservationInfo(self.private_name) + # --- 在绑定前设置 bot_id --- + self.observation_info.bot_id = self.bot_id + # --- 设置结束 --- self.observation_info.bind_to_chat_observer(self.chat_observer) # print(self.chat_observer.get_cached_messages(limit=) @@ -106,7 +115,7 @@ class Conversation: last_msg = initial_messages[-1] self.observation_info.last_message_time = last_msg.get("time") last_user_info = UserInfo.from_dict(last_msg.get("user_info", {})) - self.observation_info.last_message_sender = last_user_info.user_id + self.observation_info.last_message_sender = str(last_user_info.user_id) # 确保是字符串 self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") logger.info( @@ -148,14 +157,11 @@ class Conversation: self.should_continue = False continue try: - # --- 在规划前记录当前新消息数量 --- - initial_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - initial_new_message_count = self.observation_info.new_messages_count + 1 # 算上麦麦自己发的那一条 - else: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' before planning." - ) + # --- 修改:记录规划开始时的时间戳和未处理消息数 --- + planning_marker_time = self.observation_info.last_message_time or time.time() + initial_unprocessed_count = self.observation_info.new_messages_count + # logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}") + # --- 调用 Action Planner --- # 传递 self.conversation_info.last_successful_reply_action @@ -163,46 +169,37 @@ class Conversation: self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) - # --- 规划后检查是否有 *更多* 新消息到达 --- - current_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - current_new_message_count = self.observation_info.new_messages_count - else: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' after planning." - ) - - if current_new_message_count > initial_new_message_count + 2: + # --- 修改:规划后检查是否有 *显著增多* 的新消息到达 --- + current_unprocessed_count = self.observation_info.new_messages_count + # 增加一个缓冲值,例如允许规划期间到达1-2条新消息不打断 + planning_buffer = 2 + if current_unprocessed_count > initial_unprocessed_count + planning_buffer: logger.info( - f"[私聊][{self.private_name}]规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划" + f"[私聊][{self.private_name}]规划期间收到较多新消息 ({initial_unprocessed_count} -> {current_unprocessed_count}),超过缓冲 {planning_buffer},跳过本次行动,立即重新规划" ) # 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了 self.conversation_info.last_successful_reply_action = None - await asyncio.sleep(0.1) - continue + await asyncio.sleep(0.1) # 短暂等待,避免CPU空转 + continue # 重新进入循环进行规划 - # 包含 send_new_message - if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]: - if hasattr(self.observation_info, "clear_unprocessed_messages"): - logger.debug( - f"[私聊][{self.private_name}]准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。" - ) - await self.observation_info.clear_unprocessed_messages() - if hasattr(self.observation_info, "new_messages_count"): - self.observation_info.new_messages_count = 0 - else: - logger.error( - f"[私聊][{self.private_name}]无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!" - ) + # --- 修改:移除旧的清理逻辑 --- + # 旧逻辑: 在执行动作前清理所有已知的新消息 + # if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]: + # # ... (旧的清理代码) ... + # 新逻辑: 不在这里清理,交给 _handle_action 成功发送后处理 - await self._handle_action(action, reason, self.observation_info, self.conversation_info) + # --- 传递 planning_marker_time 给 _handle_action --- + await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time) # 检查是否需要结束对话 (逻辑不变) goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: for goal_item in self.conversation_info.goal_list: + current_goal = None # 初始化 if isinstance(goal_item, dict): current_goal = goal_item.get("goal") + elif isinstance(goal_item, str): # 处理直接是字符串的情况 + current_goal = goal_item if current_goal == "结束对话": goal_ended = True @@ -218,31 +215,36 @@ class Conversation: await asyncio.sleep(1) if self.should_continue: - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # 保持循环间的短暂间隔 logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") - def _check_new_messages_after_planning(self): - """检查在规划后是否有新消息""" - # 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性 - if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "new_messages_count"): + # --- 修改:_check_new_messages_after_planning 调整为接收标记时间 --- + def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool: + """检查在规划开始后是否有新消息到达(考虑缓冲)""" + if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "last_message_time"): logger.warning( - f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。" + f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'last_message_time' 属性,无法检查新消息。" ) - return False # 或者根据需要抛出错误 + return False - if self.observation_info.new_messages_count > 2: - logger.info( - f"[私聊][{self.private_name}]生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划" - ) - # 如果有新消息,也应该重置上次回复状态 - if hasattr(self, "conversation_info"): # 确保 conversation_info 已初始化 - self.conversation_info.last_successful_reply_action = None - else: - logger.warning( - f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" - ) - return True + # 检查最后一条消息的时间是否晚于规划开始的时间点 + if self.observation_info.last_message_time and self.observation_info.last_message_time > planning_marker_time: + # 这里可以进一步检查新消息的数量是否超过 buffer,但目前仅检查是否有新消息即可打断 + # current_unprocessed_count = self.observation_info.new_messages_count + # initial_count_at_planning # 这个需要传递进来或重新设计获取方式 + # if current_unprocessed_count > initial_count_at_planning + buffer: + logger.info( + f"[私聊][{self.private_name}]动作执行期间(生成/检查/发送前)检测到新消息 (晚于 {planning_marker_time}),取消当前动作并重新规划" + ) + # 重置上次成功回复状态 + if hasattr(self, "conversation_info"): + self.conversation_info.last_successful_reply_action = None + else: + logger.warning( + f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" + ) + return True return False def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: @@ -274,8 +276,9 @@ class Conversation: # 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题 raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e + # --- 修改:_handle_action 接收 planning_marker_time --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo + self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float ): """处理规划的行动""" @@ -315,6 +318,14 @@ class Conversation: ) self.state = ConversationState.GENERATING + # --- 修改:生成前检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]生成追问回复前检测到新消息,取消动作") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "生成追问前收到新消息,取消"} + ) + return # 直接返回,主循环会重新规划 + # 1. 生成回复 (调用 generate 时传入 action_type) self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="send_new_message" @@ -323,10 +334,25 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" ) + # --- 修改:检查前再次检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]检查追问回复前检测到新消息,取消动作") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"检查追问前收到新消息,取消发送: {self.generated_reply}"} + ) + return + # 2. 检查回复 (逻辑不变) self.state = ConversationState.CHECKING try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" + current_goal_str = "" # 初始化 + if conversation_info.goal_list: + first_goal = conversation_info.goal_list[0] + if isinstance(first_goal, dict): + current_goal_str = first_goal.get("goal", "") + elif isinstance(first_goal, str): + current_goal_str = first_goal + is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, @@ -337,6 +363,17 @@ class Conversation: logger.info( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + + # --- 修改:记录失败原因和内容到 conversation_info --- + if not is_suitable: + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + else: + # 清除上次失败记录 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + # --- 记录结束 --- + if is_suitable: final_reply_to_send = self.generated_reply break @@ -344,32 +381,45 @@ class Conversation: logger.warning( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" ) - break + break # 跳出循环,后续会处理 need_replan except Exception as check_err: logger.error( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - break + # 记录失败 + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + break # 出错也跳出循环 # 循环结束,处理最终结果 if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动") + # --- 修改:发送前最终检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]发送追问回复前检测到新消息,取消发送") conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"} + {"status": "recall", "final_reason": f"发送追问前收到新消息,取消发送: {final_reply_to_send}"} ) return # 直接返回,重新规划 # 发送合适的回复 self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 + send_success = await self._send_reply() # 调用发送函数,获取发送结果 + + if send_success: + # --- 修改:发送成功后,标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) + # 更新状态: 标记上次成功是 send_new_message + self.conversation_info.last_successful_reply_action = "send_new_message" + action_successful = True # 标记动作成功 + else: + # 发送失败处理 + logger.error(f"[私聊][{self.private_name}]发送追问回复失败") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"} + ) + self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态 - # 更新状态: 标记上次成功是 send_new_message - self.conversation_info.last_successful_reply_action = "send_new_message" - action_successful = True # 标记动作成功 elif need_replan: # 打回动作决策 @@ -379,6 +429,8 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} ) + # 打回时不清空失败记录,让 planner 看到 + self.conversation_info.last_successful_reply_action = None # 重置成功状态 else: # 追问失败 @@ -390,10 +442,13 @@ class Conversation: ) # 重置状态: 追问失败,下次用初始 prompt self.conversation_info.last_successful_reply_action = None + # 失败时不清空失败记录 # 执行 Wait 操作 logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") self.state = ConversationState.WAITING + # --- Wait 操作也需要标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait(self.conversation_info) wait_action_record = { "action": "wait", @@ -403,6 +458,7 @@ class Conversation: "final_reason": None, } conversation_info.done_action.append(wait_action_record) + action_successful = True # Wait 本身算成功完成 elif action == "direct_reply": max_reply_attempts = 3 @@ -419,6 +475,14 @@ class Conversation: ) self.state = ConversationState.GENERATING + # --- 修改:生成前检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]生成首次回复前检测到新消息,取消动作") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "生成首次回复前收到新消息,取消"} + ) + return + # 1. 生成回复 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="direct_reply" @@ -427,10 +491,25 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" ) + # --- 修改:检查前再次检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]检查首次回复前检测到新消息,取消动作") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"检查首次回复前收到新消息,取消发送: {self.generated_reply}"} + ) + return + # 2. 检查回复 self.state = ConversationState.CHECKING try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" + current_goal_str = "" # 初始化 + if conversation_info.goal_list: + first_goal = conversation_info.goal_list[0] + if isinstance(first_goal, dict): + current_goal_str = first_goal.get("goal", "") + elif isinstance(first_goal, str): + current_goal_str = first_goal + is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, @@ -441,6 +520,17 @@ class Conversation: logger.info( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + + # --- 修改:记录失败原因和内容到 conversation_info --- + if not is_suitable: + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + else: + # 清除上次失败记录 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + # --- 记录结束 --- + if is_suitable: final_reply_to_send = self.generated_reply break @@ -448,32 +538,45 @@ class Conversation: logger.warning( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" ) - break + break # 跳出循环 except Exception as check_err: logger.error( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - break + # 记录失败 + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + break # 出错也跳出循环 # 循环结束,处理最终结果 if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成首次回复期间收到新消息,取消发送,重新规划行动") + # --- 修改:发送前最终检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]发送首次回复前检测到新消息,取消发送") conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"} + {"status": "recall", "final_reason": f"发送首次回复前收到新消息,取消发送: {final_reply_to_send}"} ) return # 直接返回,重新规划 # 发送合适的回复 self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 + send_success = await self._send_reply() # 调用发送函数 + + if send_success: + # --- 修改:发送成功后,标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) + # 更新状态: 标记上次成功是 direct_reply + self.conversation_info.last_successful_reply_action = "direct_reply" + action_successful = True # 标记动作成功 + else: + # 发送失败处理 + logger.error(f"[私聊][{self.private_name}]发送首次回复失败") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"} + ) + self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态 - # 更新状态: 标记上次成功是 direct_reply - self.conversation_info.last_successful_reply_action = "direct_reply" - action_successful = True # 标记动作成功 elif need_replan: # 打回动作决策 @@ -483,6 +586,8 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} ) + # 打回时不清空失败记录 + self.conversation_info.last_successful_reply_action = None # 重置成功状态 else: # 首次回复失败 @@ -494,10 +599,13 @@ class Conversation: ) # 重置状态: 首次回复失败,下次还是用初始 prompt self.conversation_info.last_successful_reply_action = None + # 失败时不清空失败记录 # 执行 Wait 操作 (保持原有逻辑) logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") self.state = ConversationState.WAITING + # --- Wait 操作也需要标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait(self.conversation_info) wait_action_record = { "action": "wait", @@ -507,32 +615,7 @@ class Conversation: "final_reason": None, } conversation_info.done_action.append(wait_action_record) - - # elif action == "fetch_knowledge": - # self.state = ConversationState.FETCHING - # knowledge_query = reason - # try: - # 检查 knowledge_fetcher 是否存在 - # if not hasattr(self, "knowledge_fetcher"): - # logger.error(f"[私聊][{self.private_name}]KnowledgeFetcher 未初始化,无法获取知识。") - # raise AttributeError("KnowledgeFetcher not initialized") - - # knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history) - # logger.info(f"[私聊][{self.private_name}]获取到知识: {knowledge[:100]}..., 来源: {source}") - # if knowledge: - # 确保 knowledge_list 存在 - # if not hasattr(conversation_info, "knowledge_list"): - # conversation_info.knowledge_list = [] - # conversation_info.knowledge_list.append( - # {"query": knowledge_query, "knowledge": knowledge, "source": source} - # ) - # action_successful = True - # except Exception as fetch_err: - # logger.error(f"[私聊][{self.private_name}]获取知识时出错: {str(fetch_err)}") - # conversation_info.done_action[action_index].update( - # {"status": "recall", "final_reason": f"获取知识失败: {str(fetch_err)}"} - # ) - # self.conversation_info.last_successful_reply_action = None # 重置状态 + action_successful = True # Wait 本身算成功 elif action == "rethink_goal": self.state = ConversationState.RETHINKING @@ -541,7 +624,18 @@ class Conversation: if not hasattr(self, "goal_analyzer"): logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。") raise AttributeError("GoalAnalyzer not initialized") + + # --- rethink_goal 前检查新消息 --- + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]重新思考目标前检测到新消息,取消动作") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "重新思考目标前收到新消息,取消"} + ) + return + await self.goal_analyzer.analyze_goal(conversation_info, observation_info) + # --- rethink_goal 后标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True except Exception as rethink_err: logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}") @@ -558,6 +652,14 @@ class Conversation: if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。") raise AttributeError("Waiter not initialized") + + # --- listening 前检查新消息 --- + # 倾听时如果收到新消息,通常应该继续倾听或转为回复,而不是取消 + # 所以这里不检查新消息打断 + + # --- listening 后标记处理过的消息 --- + # 倾听是等待行为,也需要标记之前的消息已处理 + await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait_listening(conversation_info) action_successful = True # Listening 完成就算成功 except Exception as listen_err: @@ -571,6 +673,15 @@ class Conversation: self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: + # --- 告别前检查新消息 --- + # 如果有新消息,可能不适合告别了 + if self._check_new_messages_during_action(planning_marker_time): + logger.info(f"[私聊][{self.private_name}]发送告别语前检测到新消息,取消告别") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语前收到新消息,取消"} + ) + return + # 1. 生成告别语 (使用 'say_goodbye' action_type) self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" @@ -579,10 +690,18 @@ class Conversation: # 2. 直接发送告别语 (不经过检查) if self.generated_reply: # 确保生成了内容 - await self._send_reply() # 调用发送方法 - # 发送成功后,标记动作成功 - action_successful = True - logger.info(f"[私聊][{self.private_name}]告别语已发送。") + send_success = await self._send_reply() # 调用发送方法 + if send_success: + # --- 发送成功后标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) + action_successful = True + logger.info(f"[私聊][{self.private_name}]告别语已发送。") + else: + logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") + action_successful = False + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语失败"} + ) else: logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") action_successful = False # 标记动作失败 @@ -608,6 +727,8 @@ class Conversation: # 这个分支现在只会在 action_planner 最终决定不告别时被调用 self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") + # --- 结束对话前标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True # 标记这个指令本身是成功的 elif action == "block_and_ignore": @@ -618,6 +739,8 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED + # --- 屏蔽前标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True # 标记动作成功 else: # 对应 'wait' 动作 @@ -628,6 +751,12 @@ class Conversation: if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。") raise AttributeError("Waiter not initialized") + + # --- Wait 前检查新消息 --- + # Wait 时如果收到新消息,wait 逻辑内部会处理并退出,所以这里不打断 + + # --- Wait 开始前标记处理过的消息 --- + await observation_info.mark_messages_processed_up_to(planning_marker_time) _timeout_occurred = await self.waiter.wait(self.conversation_info) action_successful = True # Wait 完成就算成功 except Exception as wait_err: @@ -640,23 +769,29 @@ class Conversation: # --- 更新 Action History 状态 --- # 只有当动作本身成功时,才更新状态为 done if action_successful: - conversation_info.done_action[action_index].update( - { - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - } - ) - # 重置状态: 对于非回复类动作的成功,清除上次回复状态 - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None - logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") + # 确保 action_index 在有效范围内 + if action_index < len(conversation_info.done_action): + conversation_info.done_action[action_index].update( + { + "status": "done", + "time": datetime.datetime.now().strftime("%H:%M:%S"), + } + ) + # 重置状态: 对于非回复类动作的成功,清除上次回复状态 + if action not in ["direct_reply", "send_new_message"]: + self.conversation_info.last_successful_reply_action = None + # logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") + else: + logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}") + # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action - async def _send_reply(self): - """发送回复""" + # --- 修改:_send_reply 返回发送是否成功 --- + async def _send_reply(self) -> bool: + """发送回复,并返回发送是否成功""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return + return False # 发送失败 try: _current_time = time.time() @@ -665,10 +800,10 @@ class Conversation: # 发送消息 (确保 direct_sender 和 chat_stream 有效) if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return + return False # 发送失败 if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return + return False # 发送失败 await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) @@ -680,22 +815,37 @@ class Conversation: # logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时") self.state = ConversationState.ANALYZING # 更新状态 + return True # 发送成功 except Exception as e: logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") self.state = ConversationState.ANALYZING + return False # 发送失败 async def _send_timeout_message(self): """发送超时结束消息""" try: - messages = self.chat_observer.get_cached_messages(limit=1) + # 尝试从 observation_info 获取历史记录 + if not hasattr(self, 'observation_info') or not self.observation_info.chat_history: + logger.warning(f"[私聊][{self.private_name}]无法获取聊天历史,无法发送超时消息。") + return + + messages = self.observation_info.chat_history[-1:] # 获取最后一条 if not messages: return - latest_message = self._convert_to_message(messages[0]) + latest_message_dict = messages[0] + # 确保 chat_stream 存在 + if not self.chat_stream: + logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送超时消息。") + return + # 将字典转换为 Message 对象 + latest_message = self._convert_to_message(latest_message_dict) + await self.direct_sender.send_message( - chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message + chat_stream=self.chat_stream, content="[自动消息] 对方长时间未响应,对话已超时。", reply_to_message=latest_message ) except Exception as e: logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") + diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index c7572955..3f4cfb62 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -2,11 +2,18 @@ from typing import List, Optional, Dict, Any, Set from maim_message import UserInfo import time from src.common.logger import get_module_logger -from .chat_observer import ChatObserver +# 移除旧的 ChatObserver 导入,因为它现在通过类型提示和方法参数传入 +# from .chat_observer import ChatObserver from .chat_states import NotificationHandler, NotificationType, Notification from src.plugins.utils.chat_message_builder import build_readable_messages import traceback # 导入 traceback 用于调试 +# 确保 ChatObserver 类型可用,即使不直接导入 +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .chat_observer import ChatObserver + + logger = get_module_logger("observation_info") @@ -75,7 +82,7 @@ class ObservationInfoHandler(NotificationHandler): elif notification_type == NotificationType.ACTIVE_CHAT: # 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理) is_active = data.get("is_active", False) - self.observation_info.is_cold = not is_active + self.observation_info.is_cold_chat = not is_active # Corrected variable name elif notification_type == NotificationType.BOT_SPEAKING: # 处理机器人说话通知 (按需实现) @@ -97,6 +104,9 @@ class ObservationInfoHandler(NotificationHandler): ] if len(self.observation_info.unprocessed_messages) < original_count: logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})") + # 更新未处理消息计数 + self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) + elif notification_type == NotificationType.USER_JOINED: # 处理用户加入通知 (如果适用私聊场景) @@ -142,9 +152,9 @@ class ObservationInfo: cold_chat_start_time: Optional[float] cold_chat_duration: float is_typing: bool - is_cold_chat: bool + is_cold_chat: bool # Corrected variable name changed: bool - chat_observer: Optional[ChatObserver] + chat_observer: Optional['ChatObserver'] # Use forward reference handler: Optional[ObservationInfoHandler] def __init__(self, private_name: str): @@ -168,7 +178,7 @@ class ObservationInfo: self.last_message_id: Optional[str] = None self.last_message_content: str = "" self.last_message_sender: Optional[str] = None - self.bot_id: Optional[str] = None + self.bot_id: Optional[str] = None # Consider initializing from config self.chat_history_count: int = 0 self.new_messages_count: int = 0 self.cold_chat_start_time: Optional[float] = None @@ -176,15 +186,15 @@ class ObservationInfo: # state self.is_typing: bool = False - self.is_cold_chat: bool = False + self.is_cold_chat: bool = False # Corrected variable name self.changed: bool = False # 关联对象 - self.chat_observer: Optional[ChatObserver] = None + self.chat_observer: Optional['ChatObserver'] = None # Use forward reference self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name) - def bind_to_chat_observer(self, chat_observer: ChatObserver): + def bind_to_chat_observer(self, chat_observer: 'ChatObserver'): # Use forward reference """绑定到指定的chat_observer Args: @@ -208,10 +218,30 @@ class ObservationInfo: self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # 可以根据需要注册更多通知类型 - # self.chat_observer.notification_manager.register_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- 新增:注册其他必要的通知类型 --- + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.ACTIVE_CHAT, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.BOT_SPEAKING, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.USER_SPEAKING, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.USER_JOINED, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.USER_LEFT, handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.ERROR, handler=self.handler + ) + # --- 注册结束 --- + logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") except Exception as e: logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") @@ -223,16 +253,23 @@ class ObservationInfo: self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler ): # 增加 handler 检查 try: - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler - ) - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler - ) - # 如果注册了其他类型,也要在这里注销 - # self.chat_observer.notification_manager.unregister_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- 注销所有注册过的通知类型 --- + notification_types_to_unregister = [ + NotificationType.NEW_MESSAGE, + NotificationType.COLD_CHAT, + NotificationType.ACTIVE_CHAT, + NotificationType.BOT_SPEAKING, + NotificationType.USER_SPEAKING, + NotificationType.MESSAGE_DELETED, + NotificationType.USER_JOINED, + NotificationType.USER_LEFT, + NotificationType.ERROR, + ] + for nt in notification_types_to_unregister: + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=nt, handler=self.handler + ) + # --- 注销结束 --- logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") except Exception as e: logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") @@ -253,13 +290,18 @@ class ObservationInfo: message_id = message.get("message_id") processed_text = message.get("processed_plain_text", "") + # 检查消息是否已存在于未处理列表中 (避免重复添加) + if any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): + # logger.debug(f"[私聊][{self.private_name}]消息 {message_id} 已存在于未处理列表,跳过") + return + # 只有在新消息到达时才更新 last_message 相关信息 if message_time and message_time > (self.last_message_time or 0): self.last_message_time = message_time self.last_message_id = message_id self.last_message_content = processed_text # 重置冷场计时器 - self.is_cold_chat = False + self.is_cold_chat = False # Corrected variable name self.cold_chat_start_time = None self.cold_chat_duration = 0.0 @@ -267,7 +309,8 @@ class ObservationInfo: sender_id = str(user_info.user_id) # 确保是字符串 self.last_message_sender = sender_id # 更新发言时间 - if sender_id == self.bot_id: + # 假设 self.bot_id 已经正确初始化 (例如从 global_config) + if self.bot_id and sender_id == str(self.bot_id): self.last_bot_speak_time = message_time else: self.last_user_speak_time = message_time @@ -286,8 +329,11 @@ class ObservationInfo: self.update_changed() # 标记状态已改变 else: # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告 - pass # logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}") + # 即使时间戳旧,也可能需要加入未处理列表(如果它是之前漏掉的) + # 但为了避免复杂化,暂时按原逻辑处理:只处理时间更新的消息 + pass + def update_changed(self): """标记状态已改变,并重置标记""" @@ -301,8 +347,8 @@ class ObservationInfo: is_cold: 是否处于冷场状态 current_time: 当前时间戳 """ - if is_cold != self.is_cold_chat: # 仅在状态变化时更新 - self.is_cold_chat = is_cold + if is_cold != self.is_cold_chat: # 仅在状态变化时更新 # Corrected variable name + self.is_cold_chat = is_cold # Corrected variable name if is_cold: # 进入冷场状态 self.cold_chat_start_time = ( @@ -318,7 +364,7 @@ class ObservationInfo: self.update_changed() # 状态变化,标记改变 # 即使状态没变,如果是冷场状态,也更新持续时间 - if self.is_cold_chat and self.cold_chat_start_time: + if self.is_cold_chat and self.cold_chat_start_time: # Corrected variable name self.cold_chat_duration = current_time - self.cold_chat_start_time def get_active_duration(self) -> float: @@ -351,15 +397,27 @@ class ObservationInfo: return None return time.time() - self.last_bot_speak_time - async def clear_unprocessed_messages(self): - """将未处理消息移入历史记录,并更新相关状态""" - if not self.unprocessed_messages: - return # 没有未处理消息,直接返回 + # --- 新增方法 --- + async def mark_messages_processed_up_to(self, marker_timestamp: float): + """ + 将指定时间戳之前(包括等于)的未处理消息移入历史记录。 - # logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...") - # 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长) + Args: + marker_timestamp: 时间戳标记。 + """ + messages_to_process = [ + msg for msg in self.unprocessed_messages if msg.get("time", 0) <= marker_timestamp + ] + + if not messages_to_process: + # logger.debug(f"[私聊][{self.private_name}]没有在 {marker_timestamp} 之前的未处理消息。") + return + + # logger.debug(f"[私聊][{self.private_name}]处理 {len(messages_to_process)} 条直到 {marker_timestamp} 的未处理消息...") + + # 将要处理的消息添加到历史记录 max_history_len = 100 # 示例:最多保留100条历史记录 - self.chat_history.extend(self.unprocessed_messages) + self.chat_history.extend(messages_to_process) if len(self.chat_history) > max_history_len: self.chat_history = self.chat_history[-max_history_len:] @@ -377,13 +435,25 @@ class ObservationInfo: logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示 - # 清空未处理消息列表和计数 - # cleared_count = len(self.unprocessed_messages) - self.unprocessed_messages.clear() - self.new_messages_count = 0 - # self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断 + # 从未处理列表中移除已处理的消息 + processed_ids = {msg.get("message_id") for msg in messages_to_process} + self.unprocessed_messages = [ + msg for msg in self.unprocessed_messages if msg.get("message_id") not in processed_ids + ] - self.chat_history_count = len(self.chat_history) # 更新历史记录总数 - # logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。") + # 更新未处理消息计数和历史记录总数 + self.new_messages_count = len(self.unprocessed_messages) + self.chat_history_count = len(self.chat_history) + # logger.debug(f"[私聊][{self.private_name}]已处理 {len(messages_to_process)} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") self.update_changed() # 状态改变 + + # --- 移除或注释掉旧的 clear_unprocessed_messages 方法 --- + # async def clear_unprocessed_messages(self): + # """将未处理消息移入历史记录,并更新相关状态 (此方法将被 mark_messages_processed_up_to 替代)""" + # # ... (旧代码) ... + # logger.warning(f"[私聊][{self.private_name}] 调用了已弃用的 clear_unprocessed_messages 方法。请使用 mark_messages_processed_up_to。") + # # 为了兼容性,可以暂时调用新方法处理所有消息,但不推荐 + # # await self.mark_messages_processed_up_to(time.time()) + # pass # 或者直接留空 +