diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index 5cba1bb1..faf13bd0 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -1,6 +1,7 @@ import time from typing import Tuple, Optional from .pfc_utils import retrieve_contextual_info + # import jieba # 如果需要旧版知识库的回退,可能需要 # import re # 如果需要旧版知识库的回退,可能需要 from src.common.logger_manager import get_logger @@ -336,10 +337,13 @@ class ActionPlanner: last_action_context += f"- 该行动当前状态: {status}\n" # self.last_successful_action_type = None # 非完成状态,清除记录 - retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(chat_history_text, self.private_name) + retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info( + chat_history_text, self.private_name + ) # Optional: 可以加一行日志确认结果,方便调试 - logger.info(f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}") - + logger.info( + f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}" + ) # --- 选择 Prompt --- if last_successful_reply_action in ["direct_reply", "send_new_message"]: diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index bcaf72d3..62266378 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: conversation.py import time import asyncio import datetime @@ -9,10 +11,7 @@ from ...config.config import global_config # 确保导入 global_config from typing import Dict, Any, Optional, Set # 引入 Set from ..chat.message import Message from .pfc_types import ConversationState -# 确保导入 ChatObserver 和 GoalAnalyzer (如果 pfc.py 中定义了它们) -# from .pfc import ChatObserver, GoalAnalyzer # 可能需要调整导入路径 -from .chat_observer import ChatObserver # 导入 ChatObserver -from .pfc import GoalAnalyzer # 导入 GoalAnalyzer +from .pfc import ChatObserver, GoalAnalyzer # pfc.py 包含了 GoalAnalyzer,无需重复导入 from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner @@ -22,7 +21,7 @@ from .reply_generator import ReplyGenerator from ..chat.chat_stream import ChatStream from maim_message import UserInfo from src.plugins.chat.chat_stream import chat_manager -from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 注意:这里是 PFC_KnowledgeFetcher.py from .waiter import Waiter import traceback @@ -133,6 +132,10 @@ class Conversation: ) # 让 ChatObserver 从加载的最后一条消息之后开始同步 + # **** 注意:这里的 last_message_time 设置可能需要 review **** + # 如果数据库消息时间戳可能不完全连续,直接设置 last_message_time 可能导致 observer 错过消息 + # 更稳妥的方式是让 observer 自己管理其内部的 last_message_time 或 last_message_id + # 暂时保留,但标记为潜在问题点。如果 observer 逻辑是可靠的,则此行 OK。 self.chat_observer.last_message_time = self.observation_info.last_message_time self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录 else: @@ -157,7 +160,7 @@ class Conversation: async def _plan_and_action_loop(self): """思考步,PFC核心循环模块""" while self.should_continue: - # 忽略逻辑 + # 忽略逻辑 (保持不变) if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: await asyncio.sleep(30) continue @@ -166,67 +169,92 @@ class Conversation: self.ignore_until_timestamp = None self.should_continue = False continue - try: - # --- 记录规划开始时的时间戳和未处理消息的 ID 集合 --- - planning_marker_time = time.time() - # 获取规划开始时未处理消息的 ID 集合 - initial_unprocessed_ids: Set[str] = { - msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id") - } - logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理消息ID数: {len(initial_unprocessed_ids)}") - # --- 调用 Action Planner --- + try: + # --- [修改点 1] 在规划前记录新消息状态 --- + # 记录规划开始时未处理消息的 ID 集合,用于后续判断哪些是新来的 + message_ids_before_planning = set() + initial_unprocessed_message_count = 0 # <-- 新增:记录规划前的未处理消息数 + if hasattr(self.observation_info, "unprocessed_messages"): + message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")} + initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) # <-- 获取初始数量 + logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}") + else: + logger.warning( + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning." + ) + + + # --- 调用 Action Planner (保持不变) --- action, reason = await self.action_planner.plan( self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) - # --- 规划后,精确计算规划期间收到的“用户”新消息数 --- - current_unprocessed_messages = self.observation_info.unprocessed_messages - new_messages_during_planning = [] + # --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 --- + current_unprocessed_messages = [] + current_unprocessed_message_count = 0 + if hasattr(self.observation_info, "unprocessed_messages"): + current_unprocessed_messages = self.observation_info.unprocessed_messages + current_unprocessed_message_count = len(current_unprocessed_messages) # <-- 获取当前数量 + else: + logger.warning( + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning." + ) + + # 计算规划期间实际新增的消息数量 + new_messages_during_planning_count = 0 + new_message_ids_during_planning = set() for msg in current_unprocessed_messages: msg_id = msg.get("message_id") - # 检查消息ID是否不在初始集合中,且消息时间戳晚于规划开始时间(增加时间判断以防万一) - if msg_id and msg_id not in initial_unprocessed_ids and msg.get("time", 0) >= planning_marker_time: - new_messages_during_planning.append(msg) + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_planning_count += 1 + new_message_ids_during_planning.add(msg_id) - # 计算这些新消息中来自用户的数量 - new_user_messages_count = 0 - for msg in new_messages_during_planning: - user_info_dict = msg.get("user_info", {}) - sender_id = None - if isinstance(user_info_dict, dict): - sender_id = str(user_info_dict.get("user_id")) # 确保是字符串 - # 检查发送者ID是否不是机器人ID - if sender_id and sender_id != self.bot_id: - new_user_messages_count += 1 + logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}") - logger.debug(f"[私聊][{self.private_name}]规划期间共收到新消息: {len(new_messages_during_planning)} 条, 其中用户消息: {new_user_messages_count} 条") - - # --- 根据用户新消息数决定是否重新规划 --- - planning_buffer = 2 # 用户指定的缓冲值 - if new_user_messages_count > planning_buffer: + # **核心逻辑:判断是否中断** + # 这里的 +2 是根据你的需求来的,代表允许的缓冲 + # 我们比较的是 *规划期间新增的消息数* 是否超过阈值 + if new_messages_during_planning_count > 2: logger.info( - f"[私聊][{self.private_name}]规划期间收到 {new_user_messages_count} 条用户新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划" + f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划" ) + # 中断时,重置上次回复状态,因为需要基于最新消息重新决策 self.conversation_info.last_successful_reply_action = None - await asyncio.sleep(0.1) - continue # 重新进入循环进行规划 + # **重要**: 中断时不清空未处理消息,留给下一轮规划处理 + await asyncio.sleep(0.1) # 短暂暂停避免CPU空转 + continue # 跳过本轮后续处理,直接进入下一轮循环重新规划 - # --- 如果规划期间用户新消息未超限,则继续执行规划的动作 --- - # 将 planning_marker_time 和 new_user_messages_count 传递给 _handle_action - await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_user_messages_count) + # --- [修改点 3] 准备执行动作,处理规划时已知的消息 --- + # 如果决定要回复 (direct_reply 或 send_new_message),并且规划开始时就有未处理消息 + # 这表示 LLM 规划时已经看到了这些消息 + # 我们需要在发送回复 *后* 清理掉这些规划时已知的消息 + # 注意:这里不再立即清理,清理逻辑移到 _handle_action 成功发送后 + messages_known_during_planning = [] + if action in ["direct_reply", "send_new_message"] and initial_unprocessed_message_count > 0: + messages_known_during_planning = [ + msg for msg_id in message_ids_before_planning + if (msg := next((m for m in self.observation_info.unprocessed_messages if m.get("message_id") == msg_id), None)) is not None + ] + logger.debug(f"[私聊][{self.private_name}]规划时已知 {len(messages_known_during_planning)} 条消息,将在回复成功后清理。") - # 检查是否需要结束对话 (逻辑不变) + + # --- 执行动作 --- + # 将规划时已知需要清理的消息ID集合传递给 _handle_action + await self._handle_action(action, reason, self.observation_info, self.conversation_info, message_ids_before_planning) + + # --- 检查是否需要结束对话 (逻辑保持不变) --- goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: for goal_item in self.conversation_info.goal_list: - current_goal = None # 初始化 + current_goal = None # 初始化 current_goal if isinstance(goal_item, dict): current_goal = goal_item.get("goal") elif isinstance(goal_item, str): # 处理直接是字符串的情况 - current_goal = goal_item + current_goal = goal_item - if current_goal == "结束对话": + # 确保 current_goal 是字符串再比较 + if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True break @@ -237,16 +265,49 @@ class Conversation: except Exception as loop_err: logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - await asyncio.sleep(1) + await asyncio.sleep(1) # 出错时暂停一下 + # 循环间隔 if self.should_continue: - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # 非阻塞的短暂停 logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") + # --- [修改点 4] 修改 _check_new_messages_after_planning --- + # 重命名并修改逻辑,用于在 *发送前* 检查是否有过多新消息(兜底检查) + def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool: + """在发送回复前,最后检查一次是否有过多新消息导致需要中断""" + if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "unprocessed_messages"): + logger.warning( + f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'unprocessed_messages' 属性,无法检查新消息。" + ) + return False + + current_unprocessed_messages = self.observation_info.unprocessed_messages + new_messages_count = 0 + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_count += 1 + + # 使用与规划后检查相同的阈值 + if new_messages_count > 2: + logger.info( + f"[私聊][{self.private_name}]准备发送时发现新增消息数 ({new_messages_count}) 超过阈值(2),取消发送并重新规划" + ) + if hasattr(self, "conversation_info"): + self.conversation_info.last_successful_reply_action = None + else: + logger.warning( + f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" + ) + return True # 需要中断 + return False # 不需要中断 + + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象""" + """将消息字典转换为Message对象 (保持不变)""" try: chat_info = msg_dict.get("chat_info") if chat_info and isinstance(chat_info, dict): @@ -277,9 +338,14 @@ class Conversation: logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}") raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e - # --- 修改:_handle_action 接收 planning_marker_time 和 new_user_messages_count --- + # --- [修改点 5] 修改 _handle_action 签名并调整内部逻辑 --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_user_messages_during_planning: int + self, + action: str, + reason: str, + observation_info: ObservationInfo, + conversation_info: ConversationInfo, + message_ids_before_planning: set # <-- 接收规划前的消息ID集合 ): """处理规划的行动""" @@ -299,10 +365,11 @@ class Conversation: action_index = len(conversation_info.done_action) - 1 action_successful = False + reply_sent = False # <-- 新增:标记是否成功发送了回复 # --- 根据不同的 action 执行 --- - - if action == "send_new_message": + if action == "direct_reply" or action == "send_new_message": + # 合并 direct_reply 和 send_new_message 的大部分逻辑 max_reply_attempts = 3 reply_attempt_count = 0 is_suitable = False @@ -312,263 +379,167 @@ class Conversation: while reply_attempt_count < max_reply_attempts and not is_suitable: reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) + log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." + logger.info(log_prefix) self.state = ConversationState.GENERATING - # 1. 生成回复 + # 1. 生成回复 (传入 action_type) self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="send_new_message" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" + observation_info, conversation_info, action_type=action ) + logger.info(f"{log_prefix} 生成内容: {self.generated_reply}") # 2. 检查回复 self.state = ConversationState.CHECKING try: - current_goal_str = "" - if conversation_info.goal_list: - first_goal = conversation_info.goal_list[0] - if isinstance(first_goal, dict): - current_goal_str = first_goal.get("goal", "") - elif isinstance(first_goal, str): - current_goal_str = first_goal + current_goal_str = "" # 初始化 + if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list: + goal_item = conversation_info.goal_list[0] # 取第一个目标 + if isinstance(goal_item, dict): + current_goal_str = goal_item.get('goal', '') + elif isinstance(goal_item, str): + current_goal_str = goal_item + + # 确保 chat_history 和 chat_history_str 存在 + chat_history_for_check = getattr(observation_info, 'chat_history', []) + chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, + chat_history=chat_history_for_check, + chat_history_str=chat_history_str_for_check, retry_count=reply_attempt_count - 1, ) logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" + f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) - if not is_suitable: - setattr(conversation_info, 'last_reply_rejection_reason', check_reason) - setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) + # 更新拒绝原因和内容 (仅在不合适或需要重规划时) + if not is_suitable or need_replan: + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply else: - setattr(conversation_info, 'last_reply_rejection_reason', None) - setattr(conversation_info, 'last_rejected_reply_content', None) + # 检查通过,清空上次拒绝记录 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None if is_suitable: final_reply_to_send = self.generated_reply - break + break # 检查通过,跳出循环 elif need_replan: logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" + f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}" ) - break + break # 需要重新规划,跳出循环 except Exception as check_err: logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" + f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - setattr(conversation_info, 'last_reply_rejection_reason', check_reason) - setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) - break + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" + conversation_info.last_rejected_reply_content = self.generated_reply # 记录出错时尝试的内容 + break # 检查出错,跳出循环 - # 循环结束,处理最终结果 + # --- 处理生成和检查的结果 --- if is_suitable: - # 发送合适的回复 + # --- [修改点 6] 发送前最后检查是否需要中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]生成回复后、发送前发现过多新消息,取消发送,重新规划") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"发送前发现过多新消息,取消发送: {final_reply_to_send}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 + return # 直接返回,主循环会重新规划 + + # 确认发送 self.generated_reply = final_reply_to_send - send_success = await self._send_reply() + send_success = await self._send_reply() # 调用发送函数 if send_success: - # 发送成功后,标记处理过的消息 - await observation_info.mark_messages_processed_up_to(planning_marker_time) - - # --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 --- - if new_user_messages_during_planning > 0: - logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_user_messages_during_planning} 条用户新消息,强制重置回复状态以进行新规划。") - self.conversation_info.last_successful_reply_action = None # 强制重新规划 - else: - # 只有在规划期间没有用户新消息时,才设置追问状态 - logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无用户新消息,允许下次进入追问状态。") - self.conversation_info.last_successful_reply_action = "send_new_message" - # --- 核心逻辑修改结束 --- - action_successful = True - else: - logger.error(f"[私聊][{self.private_name}]发送追问回复失败") - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"} - ) - self.conversation_info.last_successful_reply_action = None + reply_sent = True # 标记回复已发送 + logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.") + # 清空上次拒绝记录 (再次确保) + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + # --- [修改点 7] 发送成功后,处理新消息并决定下一轮 prompt 类型 --- + # 获取发送后的最新未处理消息列表 + final_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + final_unprocessed_count = len(final_unprocessed_messages) + + # 计算在生成和发送期间新增的消息数 + new_messages_during_generation_count = 0 + for msg in final_unprocessed_messages: + msg_id = msg.get("message_id") + # 如果消息 ID 不在规划前的集合中,说明是新来的 + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_generation_count += 1 + + logger.debug(f"[私聊][{self.private_name}]回复发送后,当前未处理消息数: {final_unprocessed_count}, 其中生成/发送期间新增: {new_messages_during_generation_count}") + + # 根据生成期间是否有新消息,决定下次规划用哪个 prompt + if new_messages_during_generation_count > 0: + # 有 1 条或更多新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_generation_count} 条在生成/发送期间到达的新消息,下一轮将使用首次回复逻辑处理。") + self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY + else: + # 没有新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]生成/发送期间无新消息,下一轮将根据 '{action}' 使用追问逻辑。") + self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP + + # --- [修改点 8] 清理规划时已知的消息 --- + # 只有在回复成功发送后,才清理掉那些在规划时就已经看到的消息 + if message_ids_before_planning: + await observation_info.clear_processed_messages(message_ids_before_planning) + + + else: # 发送失败 + logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。") + # 发送失败,也认为动作未成功,重置状态 + action_successful = False + self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送回复时失败"} + ) elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" - ) - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None + # 检查后决定打回动作决策 + logger.warning( + f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - else: - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}" - ) - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None + else: # 多次尝试后仍然不合适 (is_suitable is False and not need_replan) + logger.warning( + f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - # 执行 Wait 操作 - logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await observation_info.mark_messages_processed_up_to(planning_marker_time) - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - action_successful = True + # 如果是 send_new_message 失败,则执行 wait (保持原 fallback 逻辑) + if action == "send_new_message": + logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") + self.state = ConversationState.WAITING + await self.waiter.wait(self.conversation_info) + wait_action_record = { + "action": "wait", + "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", + "status": "done", # wait 本身算完成 + "time": datetime.datetime.now().strftime("%H:%M:%S"), + "final_reason": None, + } + conversation_info.done_action.append(wait_action_record) + action_successful = True # fallback wait 成功 + # 注意: fallback wait 成功后,last_successful_reply_action 仍然是 None - elif action == "direct_reply": - max_reply_attempts = 3 - reply_attempt_count = 0 - is_suitable = False - need_replan = False - check_reason = "未进行尝试" - final_reply_to_send = "" - - while reply_attempt_count < max_reply_attempts and not is_suitable: - reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) - self.state = ConversationState.GENERATING - - # 1. 生成回复 - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="direct_reply" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" - ) - - # 2. 检查回复 - self.state = ConversationState.CHECKING - try: - current_goal_str = "" - if conversation_info.goal_list: - first_goal = conversation_info.goal_list[0] - if isinstance(first_goal, dict): - current_goal_str = first_goal.get("goal", "") - elif isinstance(first_goal, str): - current_goal_str = first_goal - - is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( - reply=self.generated_reply, - goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, - retry_count=reply_attempt_count - 1, - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" - ) - - if not is_suitable: - setattr(conversation_info, 'last_reply_rejection_reason', check_reason) - setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) - else: - setattr(conversation_info, 'last_reply_rejection_reason', None) - setattr(conversation_info, 'last_rejected_reply_content', None) - - if is_suitable: - final_reply_to_send = self.generated_reply - break - elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" - ) - break - except Exception as check_err: - logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" - ) - check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - setattr(conversation_info, 'last_reply_rejection_reason', check_reason) - setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) - break - - # 循环结束,处理最终结果 - if is_suitable: - # 发送合适的回复 - self.generated_reply = final_reply_to_send - send_success = await self._send_reply() - - if send_success: - # 发送成功后,标记处理过的消息 - await observation_info.mark_messages_processed_up_to(planning_marker_time) - - # --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 --- - if new_user_messages_during_planning > 0: - logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_user_messages_during_planning} 条用户新消息,强制重置回复状态以进行新规划。") - self.conversation_info.last_successful_reply_action = None # 强制重新规划 - else: - # 只有在规划期间没有用户新消息时,才设置追问状态 - logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无用户新消息,允许下次进入追问状态。") - self.conversation_info.last_successful_reply_action = "direct_reply" - # --- 核心逻辑修改结束 --- - - action_successful = True - else: - logger.error(f"[私聊][{self.private_name}]发送首次回复失败") - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"} - ) - self.conversation_info.last_successful_reply_action = None - - elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}" - ) - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None - - else: - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}" - ) - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None - - # 执行 Wait 操作 - logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await observation_info.mark_messages_processed_up_to(planning_marker_time) - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - action_successful = True - - # --- 其他动作的处理逻辑保持不变,但确保在成功后调用 mark_messages_processed_up_to --- + # --- 处理其他动作 (保持大部分不变,主要是确保状态重置) --- elif action == "rethink_goal": self.state = ConversationState.RETHINKING try: @@ -580,11 +551,14 @@ class Conversation: action_successful = True except Exception as rethink_err: logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}") - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} - ) - self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} + ) + # 无论成功失败,非回复动作都重置 last_successful_reply_action + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None # 清除拒绝原因 + conversation_info.last_rejected_reply_content = None + elif action == "listening": self.state = ConversationState.LISTENING @@ -598,57 +572,82 @@ class Conversation: action_successful = True except Exception as listen_err: logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} - ) - self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} + ) + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None elif action == "say_goodbye": self.state = ConversationState.GENERATING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: + # 1. 生成告别语 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" ) logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") + + # 2. 发送告别语 if self.generated_reply: + # --- [修改点 9] 告别前也检查中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]发送告别语前发现过多新消息,取消发送,重新规划") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语前发现过多新消息"} + ) + self.should_continue = True # 不能结束,需要重规划 + self.conversation_info.last_successful_reply_action = None # 重置状态 + return + send_success = await self._send_reply() if send_success: - await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True + reply_sent = True # 标记发送成功 logger.info(f"[私聊][{self.private_name}]告别语已发送。") + # 发送告别语成功后,通常意味着对话结束 + self.should_continue = False + logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") else: - logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") - action_successful = False - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": "发送告别语失败"} - ) + logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") + action_successful = False + # 发送失败不应结束对话,可能需要重试或做其他事 + self.should_continue = True + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语失败"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 + else: logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") action_successful = False - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": "未能生成告别语内容"} - ) - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + self.should_continue = True # 未能生成也不能结束 + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "未能生成告别语内容"} + ) + self.conversation_info.last_successful_reply_action = None + except Exception as goodbye_err: logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - self.should_continue = False action_successful = False - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} - ) + self.should_continue = True # 出错也不能结束 + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} + ) + self.conversation_info.last_successful_reply_action = None elif action == "end_conversation": self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True + # 结束对话也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}]不想再理你了...") @@ -658,8 +657,12 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED - await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True + # 忽略也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + else: # 对应 'wait' 动作 self.state = ConversationState.WAITING @@ -673,58 +676,67 @@ class Conversation: action_successful = True except Exception as wait_err: logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") - if action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"等待失败: {wait_err}"} - ) - self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"等待失败: {wait_err}"} + ) + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + # --- 更新 Action History 状态 --- if action_successful: - if action_index < len(conversation_info.done_action): - # 只有在明确不需要强制重新规划时,才在非回复动作后重置状态 - # 注意:这里的条件与回复动作后的逻辑略有不同,因为非回复动作本身就不会进入追问 - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None - - conversation_info.done_action[action_index].update( - { - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - } - ) - else: - logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}") + conversation_info.done_action[action_index].update( + { + "status": "done", + "time": datetime.datetime.now().strftime("%H:%M:%S"), + } + ) + # **注意**: last_successful_reply_action 的更新逻辑已经移到各自的动作处理中 + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'") + else: + # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action 的 final_reason + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败") + # --- [修改点 10] _send_reply 返回布尔值表示成功与否 --- async def _send_reply(self) -> bool: - """发送回复,并返回发送是否成功""" + """发送回复,并返回是否发送成功""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return False + return False # 发送失败 try: reply_content = self.generated_reply + # 检查依赖项 if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return False + return False # 发送失败 if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return False + return False # 发送失败 + # 发送消息 await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) - self.state = ConversationState.ANALYZING - return True + # 发送成功后,可以考虑触发 observer 更新,但需谨慎避免竞争条件或重复处理 + # 暂时注释掉,依赖 observer 的自然更新周期 + # self.chat_observer.trigger_update() + # await self.chat_observer.wait_for_update() + + self.state = ConversationState.ANALYZING # 更新状态 (例如,可以改为 IDLE 或 WAITING) + return True # 发送成功 except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - self.state = ConversationState.ANALYZING - return False + self.state = ConversationState.ANALYZING # 或者设置为 ERROR 状态? + return False # 发送失败 + async def _send_timeout_message(self): - """发送超时结束消息""" + """发送超时结束消息 (保持不变)""" try: if not hasattr(self, 'observation_info') or not self.observation_info.chat_history: logger.warning(f"[私聊][{self.private_name}]无法获取聊天历史,无法发送超时消息。") @@ -744,5 +756,4 @@ class Conversation: chat_stream=self.chat_stream, content="[自动消息] 对方长时间未响应,对话已超时。", reply_to_message=latest_message ) except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") - + logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") \ No newline at end of file diff --git a/src/plugins/PFC/conversation_info.py b/src/plugins/PFC/conversation_info.py index 04524b69..062a4641 100644 --- a/src/plugins/PFC/conversation_info.py +++ b/src/plugins/PFC/conversation_info.py @@ -8,3 +8,5 @@ class ConversationInfo: self.knowledge_list = [] self.memory_list = [] self.last_successful_reply_action: Optional[str] = None + self.last_reply_rejection_reason: Optional[str] = None # 用于存储上次回复被拒原因 + self.last_rejected_reply_content: Optional[str] = None # 用于存储上次被拒的回复内容 diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index 3f4cfb62..8db70281 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: observation_info.py from typing import List, Optional, Dict, Any, Set from maim_message import UserInfo import time @@ -102,10 +104,10 @@ class ObservationInfoHandler(NotificationHandler): self.observation_info.unprocessed_messages = [ msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id ] - if len(self.observation_info.unprocessed_messages) < original_count: - logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})") - # 更新未处理消息计数 - self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) + # --- [修改点 11] 更新 new_messages_count --- + self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) + if self.observation_info.new_messages_count < original_count: + logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}") elif notification_type == NotificationType.USER_JOINED: @@ -178,7 +180,7 @@ class ObservationInfo: self.last_message_id: Optional[str] = None self.last_message_content: str = "" self.last_message_sender: Optional[str] = None - self.bot_id: Optional[str] = None # Consider initializing from config + self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id,例如从 global_config 获取 self.chat_history_count: int = 0 self.new_messages_count: int = 0 self.cold_chat_start_time: Optional[float] = None @@ -194,266 +196,196 @@ class ObservationInfo: self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name) - def bind_to_chat_observer(self, chat_observer: 'ChatObserver'): # Use forward reference - """绑定到指定的chat_observer + # --- 初始化 bot_id --- + from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题 + self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None - Args: - chat_observer: 要绑定的 ChatObserver 实例 - """ + def bind_to_chat_observer(self, chat_observer: ChatObserver): + """绑定到指定的chat_observer (保持不变)""" if self.chat_observer: logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver") return self.chat_observer = chat_observer try: - if not self.handler: # 确保 handler 已经被创建 + if not self.handler: logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!") - self.chat_observer = None # 重置,防止后续错误 + self.chat_observer = None return - # 注册关心的通知类型 self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler ) self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # --- 新增:注册其他必要的通知类型 --- + # --- [修改点 12] 注册 MESSAGE_DELETED --- self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.ACTIVE_CHAT, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.BOT_SPEAKING, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.USER_SPEAKING, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.USER_JOINED, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.USER_LEFT, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.ERROR, handler=self.handler - ) - # --- 注册结束 --- - + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") except Exception as e: logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") - self.chat_observer = None # 绑定失败,重置 + self.chat_observer = None def unbind_from_chat_observer(self): - """解除与chat_observer的绑定""" + """解除与chat_observer的绑定 (保持不变)""" if ( self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler - ): # 增加 handler 检查 + ): try: - # --- 注销所有注册过的通知类型 --- - notification_types_to_unregister = [ - NotificationType.NEW_MESSAGE, - NotificationType.COLD_CHAT, - NotificationType.ACTIVE_CHAT, - NotificationType.BOT_SPEAKING, - NotificationType.USER_SPEAKING, - NotificationType.MESSAGE_DELETED, - NotificationType.USER_JOINED, - NotificationType.USER_LEFT, - NotificationType.ERROR, - ] - for nt in notification_types_to_unregister: - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=nt, handler=self.handler - ) - # --- 注销结束 --- + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler + ) + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler + ) + # --- [修改点 13] 注销 MESSAGE_DELETED --- + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") except Exception as e: logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") - finally: # 确保 chat_observer 被重置 + finally: self.chat_observer = None else: logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置") - # 修改:update_from_message 接收 UserInfo 对象 async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): - """从消息更新信息 - - Args: - message: 消息数据字典 - user_info: 解析后的 UserInfo 对象 (可能为 None) - """ + """从消息更新信息 (保持不变)""" message_time = message.get("time") message_id = message.get("message_id") processed_text = message.get("processed_plain_text", "") - # 检查消息是否已存在于未处理列表中 (避免重复添加) - if any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): - # logger.debug(f"[私聊][{self.private_name}]消息 {message_id} 已存在于未处理列表,跳过") - return - - # 只有在新消息到达时才更新 last_message 相关信息 if message_time and message_time > (self.last_message_time or 0): self.last_message_time = message_time self.last_message_id = message_id self.last_message_content = processed_text - # 重置冷场计时器 - self.is_cold_chat = False # Corrected variable name + self.is_cold_chat = False self.cold_chat_start_time = None self.cold_chat_duration = 0.0 if user_info: - sender_id = str(user_info.user_id) # 确保是字符串 + sender_id = str(user_info.user_id) self.last_message_sender = sender_id - # 更新发言时间 - # 假设 self.bot_id 已经正确初始化 (例如从 global_config) - if self.bot_id and sender_id == str(self.bot_id): + if sender_id == self.bot_id: self.last_bot_speak_time = message_time else: self.last_user_speak_time = message_time - self.active_users.add(sender_id) # 用户发言则认为其活跃 + self.active_users.add(sender_id) else: logger.warning( f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}" ) - self.last_message_sender = None # 发送者未知 + self.last_message_sender = None - # 将原始消息字典添加到未处理列表 - self.unprocessed_messages.append(message) - self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度 + # --- [修改点 14] 添加到未处理列表,并更新计数 --- + # 检查消息是否已存在于未处理列表中,避免重复添加 + if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): + self.unprocessed_messages.append(message) + self.new_messages_count = len(self.unprocessed_messages) + logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}") + self.update_changed() + else: + logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}") - # logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}") - self.update_changed() # 标记状态已改变 else: - # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告 - # logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}") - # 即使时间戳旧,也可能需要加入未处理列表(如果它是之前漏掉的) - # 但为了避免复杂化,暂时按原逻辑处理:只处理时间更新的消息 pass def update_changed(self): - """标记状态已改变,并重置标记""" - # logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)") + """标记状态已改变,并重置标记 (保持不变)""" self.changed = True async def update_cold_chat_status(self, is_cold: bool, current_time: float): - """更新冷场状态 - - Args: - is_cold: 是否处于冷场状态 - current_time: 当前时间戳 - """ - if is_cold != self.is_cold_chat: # 仅在状态变化时更新 # Corrected variable name - self.is_cold_chat = is_cold # Corrected variable name + """更新冷场状态 (保持不变)""" + if is_cold != self.is_cold_chat: + self.is_cold_chat = is_cold if is_cold: - # 进入冷场状态 self.cold_chat_start_time = ( self.last_message_time or current_time - ) # 从最后消息时间开始算,或从当前时间开始 + ) logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}") else: - # 结束冷场状态 if self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") - self.cold_chat_start_time = None # 重置开始时间 - self.update_changed() # 状态变化,标记改变 + self.cold_chat_start_time = None + self.update_changed() - # 即使状态没变,如果是冷场状态,也更新持续时间 - if self.is_cold_chat and self.cold_chat_start_time: # Corrected variable name + if self.is_cold_chat and self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time def get_active_duration(self) -> float: - """获取当前活跃时长 (距离最后一条消息的时间) - - Returns: - float: 最后一条消息到现在的时长(秒) - """ + """获取当前活跃时长 (保持不变)""" if not self.last_message_time: return 0.0 return time.time() - self.last_message_time def get_user_response_time(self) -> Optional[float]: - """获取用户最后响应时间 (距离用户最后发言的时间) - - Returns: - Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None - """ + """获取用户最后响应时间 (保持不变)""" if not self.last_user_speak_time: return None return time.time() - self.last_user_speak_time def get_bot_response_time(self) -> Optional[float]: - """获取机器人最后响应时间 (距离机器人最后发言的时间) - - Returns: - Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None - """ + """获取机器人最后响应时间 (保持不变)""" if not self.last_bot_speak_time: return None return time.time() - self.last_bot_speak_time - # --- 新增方法 --- - async def mark_messages_processed_up_to(self, marker_timestamp: float): - """ - 将指定时间戳之前(包括等于)的未处理消息移入历史记录。 - - Args: - marker_timestamp: 时间戳标记。 - """ - messages_to_process = [ - msg for msg in self.unprocessed_messages if msg.get("time", 0) <= marker_timestamp - ] - - if not messages_to_process: - # logger.debug(f"[私聊][{self.private_name}]没有在 {marker_timestamp} 之前的未处理消息。") + # --- [修改点 15] 重命名并修改 clear_unprocessed_messages --- + # async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除 + async def clear_processed_messages(self, message_ids_to_clear: Set[str]): + """将指定ID的未处理消息移入历史记录,并更新相关状态""" + if not message_ids_to_clear: + logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。") return - # logger.debug(f"[私聊][{self.private_name}]处理 {len(messages_to_process)} 条直到 {marker_timestamp} 的未处理消息...") + messages_to_move = [] + remaining_messages = [] + cleared_count = 0 - # 将要处理的消息添加到历史记录 - max_history_len = 100 # 示例:最多保留100条历史记录 - self.chat_history.extend(messages_to_process) + # 分离要清理和要保留的消息 + for msg in self.unprocessed_messages: + if msg.get("message_id") in message_ids_to_clear: + messages_to_move.append(msg) + cleared_count += 1 + else: + remaining_messages.append(msg) + + if not messages_to_move: + logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。") + return + + logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...") + + # 将要移动的消息添加到历史记录 + max_history_len = 100 + self.chat_history.extend(messages_to_move) if len(self.chat_history) > max_history_len: self.chat_history = self.chat_history[-max_history_len:] - # 更新历史记录字符串 (只使用最近一部分生成,例如20条) - history_slice_for_str = self.chat_history[-20:] + # 更新历史记录字符串 (仅使用最近一部分生成) + history_slice_for_str = self.chat_history[-20:] # 例如最近20条 try: self.chat_history_str = await build_readable_messages( history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0, # read_mark 可能需要根据逻辑调整 + read_mark=0.0, ) except Exception as e: logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") - self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示 + self.chat_history_str = "[构建聊天记录出错]" - # 从未处理列表中移除已处理的消息 - processed_ids = {msg.get("message_id") for msg in messages_to_process} - self.unprocessed_messages = [ - msg for msg in self.unprocessed_messages if msg.get("message_id") not in processed_ids - ] - - # 更新未处理消息计数和历史记录总数 + # 更新未处理消息列表和计数 + self.unprocessed_messages = remaining_messages self.new_messages_count = len(self.unprocessed_messages) self.chat_history_count = len(self.chat_history) - # logger.debug(f"[私聊][{self.private_name}]已处理 {len(messages_to_process)} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") - self.update_changed() # 状态改变 - - # --- 移除或注释掉旧的 clear_unprocessed_messages 方法 --- - # async def clear_unprocessed_messages(self): - # """将未处理消息移入历史记录,并更新相关状态 (此方法将被 mark_messages_processed_up_to 替代)""" - # # ... (旧代码) ... - # logger.warning(f"[私聊][{self.private_name}] 调用了已弃用的 clear_unprocessed_messages 方法。请使用 mark_messages_processed_up_to。") - # # 为了兼容性,可以暂时调用新方法处理所有消息,但不推荐 - # # await self.mark_messages_processed_up_to(time.time()) - # pass # 或者直接留空 + logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") + self.update_changed() # 状态改变 \ No newline at end of file diff --git a/src/plugins/PFC/pfc_utils.py b/src/plugins/PFC/pfc_utils.py index a7a412c1..b0f3f841 100644 --- a/src/plugins/PFC/pfc_utils.py +++ b/src/plugins/PFC/pfc_utils.py @@ -2,9 +2,9 @@ import traceback import json import re from typing import Dict, Any, Optional, Tuple, List, Union -from src.common.logger_manager import get_logger # 确认 logger 的导入路径 +from src.common.logger_manager import get_logger # 确认 logger 的导入路径 from src.plugins.memory_system.Hippocampus import HippocampusManager -from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 +from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 logger = get_logger("pfc_utils") @@ -47,33 +47,36 @@ async def retrieve_contextual_info(text: str, private_name: str) -> Tuple[str, s retrieved_memory_str = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n" memory_log_msg = f"自动检索到记忆: {related_memory_info.strip()[:100]}..." else: - memory_log_msg = "自动检索记忆返回为空。" + memory_log_msg = "自动检索记忆返回为空。" logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 记忆检索: {memory_log_msg}") except Exception as e: - logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}") + logger.error( + f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}" + ) retrieved_memory_str = "检索记忆时出错。\n" # 2. 检索知识 (逻辑来自原 action_planner 和 reply_generator) try: # 使用导入的 prompt_builder 实例及其方法 knowledge_result = await prompt_builder.get_prompt_info( - message=text, threshold=0.38 # threshold 可以根据需要调整 + message=text, + threshold=0.38, # threshold 可以根据需要调整 ) if knowledge_result: - retrieved_knowledge_str = knowledge_result # 直接使用返回结果 - knowledge_log_msg = "自动检索到相关知识。" + retrieved_knowledge_str = knowledge_result # 直接使用返回结果 + knowledge_log_msg = "自动检索到相关知识。" logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}") except Exception as e: - logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}") + logger.error( + f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}" + ) retrieved_knowledge_str = "检索知识时出错。\n" return retrieved_memory_str, retrieved_knowledge_str - - def get_items_from_json( content: str, private_name: str, diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index 646e0ee9..892e881b 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -1,8 +1,11 @@ from .pfc_utils import retrieve_contextual_info + # 可能用于旧知识库提取主题 (如果需要回退到旧方法) # import jieba # 如果报错说找不到 jieba,可能需要安装: pip install jieba # import re # 正则表达式库,通常 Python 自带 from typing import Tuple, List, Dict, Any + +# from src.common.logger import get_module_logger from src.common.logger_manager import get_logger from ..models.utils_model import LLMRequest from ...config.config import global_config @@ -113,7 +116,6 @@ class ReplyGenerator: self.chat_observer = ChatObserver.get_instance(stream_id, private_name) self.reply_checker = ReplyChecker(stream_id, private_name) - # 修改 generate 方法签名,增加 action_type 参数 async def generate( self, observation_info: ObservationInfo, conversation_info: ConversationInfo, action_type: str @@ -170,29 +172,35 @@ class ReplyGenerator: # 构建 Persona 文本 (persona_text) persona_text = f"你的名字是{self.name},{self.personality_info}。" - retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text + retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text # 调用共享函数进行检索 - retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(retrieval_context, self.private_name) - logger.info(f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}") - + retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info( + retrieval_context, self.private_name + ) + logger.info( + f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}" + ) + # --- 修改:构建上次回复失败原因和内容提示 --- last_rejection_info_str = "" # 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None - last_reason = getattr(conversation_info, 'last_reply_rejection_reason', None) - last_content = getattr(conversation_info, 'last_rejected_reply_content', None) + last_reason = getattr(conversation_info, "last_reply_rejection_reason", None) + last_content = getattr(conversation_info, "last_rejected_reply_content", None) if last_reason and last_content: last_rejection_info_str = ( f"\n------\n" f"【重要提示:你上一次尝试回复时失败了,以下是详细信息】\n" - f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容 + f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容 f"失败原因: “{last_reason}”\n" f"请根据【消息内容】和【失败原因】调整你的新回复,避免重复之前的错误。\n" f"------\n" ) - logger.info(f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n" - f" 内容: {last_content}\n" - f" 原因: {last_reason}") + logger.info( + f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n" + f" 内容: {last_content}\n" + f" 原因: {last_reason}" + ) # --- 选择 Prompt --- if action_type == "send_new_message": @@ -206,22 +214,24 @@ class ReplyGenerator: logger.info(f"[私聊][{self.private_name}]使用 PROMPT_DIRECT_REPLY (首次/非连续回复生成)") # --- 格式化最终的 Prompt --- - try: # <--- 增加 try-except 块处理可能的 format 错误 + try: # <--- 增加 try-except 块处理可能的 format 错误 prompt = prompt_template.format( persona_text=persona_text, goals_str=goals_str, chat_history_text=chat_history_text, retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。", - last_rejection_info=last_rejection_info_str # <--- 新增传递上次拒绝原因 + last_rejection_info=last_rejection_info_str, # <--- 新增传递上次拒绝原因 ) except KeyError as e: - logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。") - # 返回错误信息或默认回复 - return "抱歉,准备回复时出了点问题,请检查一下我的代码..." + logger.error( + f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。" + ) + # 返回错误信息或默认回复 + return "抱歉,准备回复时出了点问题,请检查一下我的代码..." except Exception as fmt_err: - logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}") - return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..." + logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}") + return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..." # --- 调用 LLM 生成 --- logger.debug(f"[私聊][{self.private_name}]发送到LLM的生成提示词:\n------\n{prompt}\n------")