diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index c71988fd..0b668d21 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -3,16 +3,23 @@ import asyncio import datetime import traceback from typing import Dict, Any, Optional, Set, List + +# 导入日志记录器 from src.common.logger_manager import get_logger +# 导入聊天消息构建和获取工具 from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat +# 导入消息相关的数据结构 from maim_message import UserInfo +# 导入聊天流管理器和聊天流类 from src.plugins.chat.chat_stream import chat_manager, ChatStream -from ..chat.message import Message +# 导入聊天消息类 +from ..chat.message import Message # 假设 Message 类在这里 +# 导入全局配置 from ...config.config import global_config # 导入 PFC 内部组件和类型 -from .pfc_types import ConversationState -from .pfc import GoalAnalyzer +from .pfc_types import ConversationState # 导入更新后的 pfc_types +from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py from .chat_observer import ChatObserver from .message_sender import DirectMessageSender from .action_planner import ActionPlanner @@ -20,9 +27,9 @@ from .observation_info import ObservationInfo from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator from .idle_conversation_starter import IdleConversationStarter -from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 from .waiter import Waiter -from .reply_checker import ReplyChecker +from .reply_checker import ReplyChecker # 确保 ReplyChecker 被导入 # 导入富文本回溯,用于更好的错误展示 from rich.traceback import install @@ -75,7 +82,6 @@ class Conversation: if not self.bot_qq_str: # 这是一个严重问题,记录错误 logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ ID!PFC 可能无法正常工作。") - # 可以在这里抛出异常或采取其他错误处理措施 async def _initialize(self): """ @@ -94,16 +100,22 @@ class Conversation: # 1. 初始化核心功能组件 logger.debug(f"[私聊][{self.private_name}] 初始化 ActionPlanner...") self.action_planner = ActionPlanner(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 GoalAnalyzer...") self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ReplyGenerator...") self.reply_generator = ReplyGenerator(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 KnowledgeFetcher...") self.knowledge_fetcher = KnowledgeFetcher(self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 Waiter...") self.waiter = Waiter(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 DirectMessageSender...") self.direct_sender = DirectMessageSender(self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ReplyChecker...") self.reply_checker = ReplyChecker(self.stream_id, self.private_name) @@ -122,12 +134,14 @@ class Conversation: # 2. 初始化信息存储和观察组件 logger.debug(f"[私聊][{self.private_name}] 获取 ChatObserver 实例...") self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ObservationInfo...") self.observation_info = ObservationInfo(self.private_name) # 确保 ObservationInfo 知道机器人的 ID if not self.observation_info.bot_id: logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id,尝试手动设置。") self.observation_info.bot_id = self.bot_qq_str + logger.debug(f"[私聊][{self.private_name}] 初始化 ConversationInfo...") self.conversation_info = ConversationInfo() @@ -268,7 +282,7 @@ class Conversation: try: logger.debug(f"[私聊][{self.private_name}] 正在创建 _plan_and_action_loop 任务...") # 创建任务,但不等待其完成,让它在后台运行 - _loop_task = asyncio.create_task(self._plan_and_action_loop()) + loop_task = asyncio.create_task(self._plan_and_action_loop()) # 可以选择性地添加完成回调来处理任务结束或异常 # loop_task.add_done_callback(self._handle_loop_completion) logger.info(f"[私聊][{self.private_name}] 规划循环任务已创建。") @@ -449,10 +463,8 @@ class Conversation: # 检查最后执行的动作是否是结束类型且成功完成 last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} - action_ended: bool = ( - last_action_record.get("action") in ["end_conversation", "say_goodbye"] and - last_action_record.get("status") == "done" - ) + action_ended: bool = (last_action_record.get("action") in ["end_conversation", "say_goodbye"] and + last_action_record.get("status") == "done") # 如果满足任一结束条件,则停止循环 if goal_ended or action_ended: @@ -517,7 +529,6 @@ class Conversation: user_info=user_info, # 使用解析出的 UserInfo 对象 processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本 detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本 - # 根据 Message 类的定义,可能还需要其他字段 ) except Exception as e: # 捕获转换过程中的任何异常 @@ -535,8 +546,8 @@ class Conversation: ): """ 处理由 ActionPlanner 规划出的具体行动。 - 包括生成回复、调用检查器、发送消息、等待、思考目标等。 - 并根据执行结果和规则更新对话状态。 + 包括生成回复、调用检查器、发送消息、等待、思考目标等,并包含重试逻辑。 + 根据执行结果和规则更新对话状态。 """ # 检查初始化状态 if not self._initialized: @@ -544,9 +555,9 @@ class Conversation: return logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") - action_start_time = time.time() # 记录动作开始时间,用于计算耗时 + action_start_time = time.time() # 记录动作开始时间 - # --- 准备动作历史记录 --- + # --- 准备动作历史记录条目 --- current_action_record = { "action": action, "plan_reason": reason, # 记录规划时的原因 @@ -570,31 +581,45 @@ class Conversation: try: # --- 根据不同的 action 类型执行相应的逻辑 --- - # 1. 处理需要生成并可能发送消息的动作 + # 1. 处理需要生成、检查、发送的动作 if action in ["direct_reply", "send_new_message"]: - # --- a. 生成回复 --- - self.state = ConversationState.GENERATING # 更新对话状态 - if not self.reply_generator: - # 检查依赖组件是否存在 - raise RuntimeError("ReplyGenerator 未初始化") - # 调用 ReplyGenerator 生成回复内容 - generated_content = await self.reply_generator.generate( - observation_info, - conversation_info, - action_type=action - ) - logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") # 日志中截断长内容 + max_reply_attempts: int = 3 # 最多尝试次数 (可配置) + reply_attempt_count: int = 0 + is_suitable: bool = False # 标记回复是否合适 + generated_content: str = "" # 存储生成的回复 + check_reason: str = "未进行检查" # 存储检查结果原因 - # 检查生成内容是否有效 - if not generated_content or generated_content.startswith("抱歉"): - # 如果生成失败或返回错误提示 - logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,标记失败。") - final_reason = "生成内容无效" - final_status = "recall" # 标记为 recall - # 重置追问状态,因为本次回复失败 - conversation_info.last_successful_reply_action = None - else: - # --- b. 检查回复 (如果生成成功) --- + # --- [核心修复] 引入重试循环 --- + while reply_attempt_count < max_reply_attempts and not is_suitable and not need_replan_from_checker: + reply_attempt_count += 1 + log_prefix = f"[私聊][{self.private_name}] 尝试生成/检查 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." + logger.info(log_prefix) + + # --- a. 生成回复 --- + self.state = ConversationState.GENERATING # 更新对话状态 + if not self.reply_generator: + # 检查依赖组件是否存在 + raise RuntimeError("ReplyGenerator 未初始化") + # 调用 ReplyGenerator 生成回复内容 + generated_content = await self.reply_generator.generate( + observation_info, + conversation_info, + action_type=action + ) + logger.info(f"{log_prefix} 生成内容: '{generated_content}...'") # 日志中截断长内容 + + # 检查生成内容是否有效 + if not generated_content or generated_content.startswith("抱歉"): + # 如果生成失败或返回错误提示 + logger.warning(f"{log_prefix} 生成内容为空或为错误提示,将进行下一次尝试。") + check_reason = "生成内容无效" # 记录原因 + # 记录拒绝信息供下次生成参考 + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = generated_content + await asyncio.sleep(0.5) # 短暂等待后重试 + continue # 进入下一次循环尝试 + + # --- b. 检查回复 --- self.state = ConversationState.CHECKING # 更新状态为检查中 if not self.reply_checker: raise RuntimeError("ReplyChecker 未初始化") @@ -611,96 +636,114 @@ class Conversation: # 获取用于检查的聊天记录 (列表和字符串形式) chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, 'chat_history', []) chat_history_text_for_check: str = getattr(observation_info, 'chat_history_str', '') - # 当前重试次数 (如果未来加入重试逻辑,这里需要传递实际次数) - retry_count: int = 0 + # 当前重试次数 (传递给 checker,可能有用) + # retry_count for checker starts from 0 + current_retry_for_checker = reply_attempt_count - 1 - logger.debug(f"[私聊][{self.private_name}] 调用 ReplyChecker 检查回复...") + logger.debug(f"{log_prefix} 调用 ReplyChecker 检查...") # 调用 ReplyChecker 的 check 方法 is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check( reply=generated_content, goal=current_goal_str, chat_history=chat_history_for_check, # 传递列表形式的历史记录 chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录 - retry_count=retry_count + retry_count=current_retry_for_checker ) - logger.info(f"[私聊][{self.private_name}] ReplyChecker 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}") + logger.info(f"{log_prefix} ReplyChecker 结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}") - # --- c. 处理检查结果 --- - if not is_suitable or need_replan_from_checker: - # 如果检查不通过或 Checker 要求重新规划 + # 如果不合适,记录原因并准备下一次尝试(如果还有次数) + if not is_suitable: # 记录拒绝原因和内容,供下次生成时参考 conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = generated_content - # 设置最终状态和原因 - final_reason = f"回复检查不通过: {check_reason}" - final_status = "recall" # 标记为 recall + # 如果不需要重规划且还有尝试次数 + if not need_replan_from_checker and reply_attempt_count < max_reply_attempts: + logger.warning(f"{log_prefix} 回复不合适,原因: {check_reason}。将进行下一次尝试。") + await asyncio.sleep(0.5) # 等待后重试 + # 如果需要重规划或达到最大次数,循环会在下次判断时自动结束 + + # --- 循环结束后,处理最终结果 --- + if is_suitable: + # 如果找到了合适的回复 + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 找到合适的回复,准备发送。") + # 清除上次的拒绝信息 (因为本次成功了) + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + # --- c. 发送回复 --- + self.generated_reply = generated_content # 使用最后一次检查通过的内容 + timestamp_before_sending = time.time() # 记录发送前时间戳 + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") + self.state = ConversationState.SENDING # 更新状态为发送中 + # 调用内部发送方法 + send_success = await self._send_reply() + send_end_time = time.time() # 记录发送结束时间 + + if send_success: + # 如果发送成功 + action_successful = True # 标记动作成功 + # final_status 和 final_reason 会在 finally 中设置 + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") + # 更新空闲计时器 + if self.idle_conversation_starter: + await self.idle_conversation_starter.update_last_message_time(send_end_time) + + # --- d. 清理已处理消息 --- + current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear: Set[str] = set() + # 遍历所有未处理消息 + for msg in current_unprocessed_messages: + msg_time = msg.get('time') + msg_id = msg.get('message_id') + sender_id = msg.get("user_info", {}).get("user_id") + # 规则:只清理【发送前】收到的、【来自他人】的消息 + if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: + message_ids_to_clear.add(msg_id) + # 如果有需要清理的消息,调用清理方法 + if message_ids_to_clear: + logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}") + await observation_info.clear_processed_messages(message_ids_to_clear) + else: + logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") + + # --- e. 决定下一轮规划类型 --- + # 从 conversation_info 获取【规划期间】收到的【他人】新消息数量 + other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) + + # 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复 + if other_new_msg_count_during_planning > 0: + logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") + conversation_info.last_successful_reply_action = None # 强制初始回复 + else: + # 规则:如果规划期间【没有】收到他人新消息,则允许追问 + logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") + conversation_info.last_successful_reply_action = action # 允许追问 + + else: + # 如果发送失败 + logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") + final_status = "recall" # 发送失败,标记为 recall + final_reason = "发送回复时失败" # 重置追问状态 conversation_info.last_successful_reply_action = None - logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因回复检查失败而被拒绝。") - # 注意:如果 need_replan_from_checker 为 True,后续逻辑会因 final_status 为 recall 而可能触发重新规划 - else: - # 如果检查通过 - # 清除上次的拒绝信息 - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - # --- d. 发送回复 --- - self.generated_reply = generated_content # 存储待发送内容 - timestamp_before_sending = time.time() # 记录发送前时间戳 - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}") - self.state = ConversationState.SENDING # 更新状态为发送中 - # 调用内部发送方法 - send_success = await self._send_reply() - send_end_time = time.time() # 记录发送结束时间 + elif need_replan_from_checker: + # 如果 Checker 要求重新规划 + logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因 ReplyChecker 要求而被取消,将重新规划。原因: {check_reason}") + final_status = "recall" # 标记为 recall + final_reason = f"回复检查要求重新规划: {check_reason}" + # # 重置追问状态 + # conversation_info.last_successful_reply_action = None - if send_success: - # 如果发送成功 - action_successful = True # 标记动作成功 - logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") - # 更新空闲计时器 - if self.idle_conversation_starter: - await self.idle_conversation_starter.update_last_message_time(send_end_time) + else: + # 达到最大尝试次数仍未找到合适回复 + logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 达到最大尝试次数 ({max_reply_attempts}),未能生成/检查通过合适的回复。最终原因: {check_reason}") + final_status = "recall" # 标记为 recall + final_reason = f"尝试{max_reply_attempts}次后失败: {check_reason}" + # # 重置追问状态 + # conversation_info.last_successful_reply_action = None - # --- e. 清理已处理消息 --- - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) - message_ids_to_clear: Set[str] = set() - # 遍历所有未处理消息 - for msg in current_unprocessed_messages: - msg_time = msg.get('time') - msg_id = msg.get('message_id') - sender_id = msg.get("user_info", {}).get("user_id") - # 规则:只清理【发送前】收到的、【来自他人】的消息 - if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: - message_ids_to_clear.add(msg_id) - # 如果有需要清理的消息,调用清理方法 - if message_ids_to_clear: - logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}") - await observation_info.clear_processed_messages(message_ids_to_clear) - else: - logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") - - # --- f. 决定下一轮规划类型 --- - # 从 conversation_info 获取【规划期间】收到的【他人】新消息数量 - other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) - - # 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复 - if other_new_msg_count_during_planning > 0: - logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") - conversation_info.last_successful_reply_action = None # 强制初始回复 - else: - # 规则:如果规划期间【没有】收到他人新消息,则允许追问 - logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") - conversation_info.last_successful_reply_action = action # 允许追问 - - else: - # 如果发送失败 - logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") - final_status = "recall" # 发送失败,标记为 recall - final_reason = "发送回复时失败" - # 重置追问状态 - conversation_info.last_successful_reply_action = None - - # 2. 处理发送告别语动作 + # 2. 处理发送告别语动作 (保持简单,不加重试) elif action == "say_goodbye": self.state = ConversationState.GENERATING if not self.reply_generator: @@ -718,7 +761,7 @@ class Conversation: logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。") final_reason = "生成内容无效" # 即使生成失败,也按计划结束对话 - final_status = "done" + final_status = "done" # 标记为 done,因为目的是结束 self.should_continue = False logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") else: @@ -732,6 +775,7 @@ class Conversation: if send_success: action_successful = True # 标记成功 + # final_status 和 final_reason 会在 finally 中设置 logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") # 更新空闲计时器 if self.idle_conversation_starter: @@ -798,6 +842,7 @@ class Conversation: raise RuntimeError("Waiter 未初始化") logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...") # 调用 Waiter 的常规等待方法,内部处理超时 + # wait 方法返回是否超时 (True=超时, False=未超时/被新消息中断) timeout_occurred = await self.waiter.wait(self.conversation_info) action_successful = True # wait 动作本身执行即视为成功 # wait 动作完成后不需要清理消息,等待新消息或超时触发重新规划 @@ -865,10 +910,14 @@ class Conversation: elif final_status == "recall" and not action_successful: # 如果最终是 recall 且未成功,且不是因为检查不通过(比如生成失败),确保原因合理 + # 保留之前的逻辑,检查是否已有更具体的失败原因 if not final_reason or final_reason == "动作未成功执行": - # 排除已经被 checker 设置的原因 - if not conversation_info.last_reply_rejection_reason: - final_reason = "动作执行失败或被取消" # 提供一个更通用的失败原因 + # 检查是否有 checker 的原因 + checker_reason = conversation_info.last_reply_rejection_reason + if checker_reason: + final_reason = f"回复检查不通过: {checker_reason}" + else: + final_reason = "动作执行失败或被取消" # 通用失败原因 # 更新历史记录字典 if conversation_info.done_action and action_index < len(conversation_info.done_action): @@ -878,7 +927,7 @@ class Conversation: "status": final_status, # 最终状态 "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间 "final_reason": final_reason, # 最终原因 - "duration_ms": int((time.time() - action_start_time) * 1000) # 记录耗时(毫秒) + "duration_ms": int((time.time() - action_start_time) * 1000) # 记录耗时(毫秒) } ) logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") @@ -939,4 +988,5 @@ class Conversation: await self.stop() except Exception as e: # 捕获发送超时消息的异常 - logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") \ No newline at end of file + logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") +