From 2e8e7e620fd75548e9e9a0c33bc85a8f96ab9f2d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 6 May 2025 07:34:26 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 395 ++++++++++++++++++-------------- 1 file changed, 224 insertions(+), 171 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 3d3aec86..8be7f109 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -6,20 +6,25 @@ from typing import Dict, Any, Optional, Set, List # 导入日志记录器 from src.common.logger_manager import get_logger + # 导入聊天消息构建和获取工具 from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat + # 导入消息相关的数据结构 from maim_message import UserInfo + # 导入聊天流管理器和聊天流类 from src.plugins.chat.chat_stream import chat_manager, ChatStream + # 导入聊天消息类 -from ..chat.message import Message # 假设 Message 类在这里 +from ..chat.message import Message # 假设 Message 类在这里 + # 导入全局配置 from ...config.config import global_config # 导入 PFC 内部组件和类型 -from .pfc_types import ConversationState # 导入更新后的 pfc_types -from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py +from .pfc_types import ConversationState # 导入更新后的 pfc_types +from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py from .chat_observer import ChatObserver from .message_sender import DirectMessageSender from .action_planner import ActionPlanner @@ -27,17 +32,19 @@ from .observation_info import ObservationInfo from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator from .idle_conversation_starter import IdleConversationStarter -from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 from .waiter import Waiter -from .reply_checker import ReplyChecker # 确保 ReplyChecker 被导入 +from .reply_checker import ReplyChecker # 确保 ReplyChecker 被导入 # 导入富文本回溯,用于更好的错误展示 from rich.traceback import install + install(extra_lines=3) # 获取当前模块的日志记录器 logger = get_logger("pfc_conversation") + class Conversation: """ 对话类,负责管理单个私聊对话的状态和核心逻辑流程。 @@ -54,11 +61,11 @@ class Conversation: """ self.stream_id: str = stream_id self.private_name: str = private_name - self.state: ConversationState = ConversationState.INIT # 对话的初始状态 - self.should_continue: bool = False # 标记对话循环是否应该继续运行 - self.ignore_until_timestamp: Optional[float] = None # 如果设置了,忽略此时间戳之前的活动 - self.generated_reply: str = "" # 存储最近生成的回复内容 - self.chat_stream: Optional[ChatStream] = None # 关联的聊天流对象 + self.state: ConversationState = ConversationState.INIT # 对话的初始状态 + self.should_continue: bool = False # 标记对话循环是否应该继续运行 + self.ignore_until_timestamp: Optional[float] = None # 如果设置了,忽略此时间戳之前的活动 + self.generated_reply: str = "" # 存储最近生成的回复内容 + self.chat_stream: Optional[ChatStream] = None # 关联的聊天流对象 # 初始化所有核心组件为 None,将在 _initialize 中创建 self.action_planner: Optional[ActionPlanner] = None @@ -71,11 +78,11 @@ class Conversation: self.chat_observer: Optional[ChatObserver] = None self.observation_info: Optional[ObservationInfo] = None self.conversation_info: Optional[ConversationInfo] = None - self.reply_checker: Optional[ReplyChecker] = None # 回复检查器 + self.reply_checker: Optional[ReplyChecker] = None # 回复检查器 # 内部状态标志 - self._initializing: bool = False # 标记是否正在初始化,防止并发问题 - self._initialized: bool = False # 标记是否已成功初始化 + self._initializing: bool = False # 标记是否正在初始化,防止并发问题 + self._initialized: bool = False # 标记是否已成功初始化 # 缓存机器人自己的 QQ 号字符串,避免重复转换 self.bot_qq_str: Optional[str] = str(global_config.BOT_QQ) if global_config.BOT_QQ else None @@ -93,7 +100,7 @@ class Conversation: logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。") return - self._initializing = True # 标记开始初始化 + self._initializing = True # 标记开始初始化 logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}") try: @@ -124,7 +131,9 @@ class Conversation: self.chat_stream = chat_manager.get_stream(self.stream_id) if not self.chat_stream: # 获取不到 ChatStream 是一个严重问题,因为无法发送消息 - logger.error(f"[私聊][{self.private_name}] 初始化错误:无法从 chat_manager 获取 stream_id {self.stream_id} 的 ChatStream。") + logger.error( + f"[私聊][{self.private_name}] 初始化错误:无法从 chat_manager 获取 stream_id {self.stream_id} 的 ChatStream。" + ) raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream") # 初始化空闲对话启动器 @@ -162,8 +171,8 @@ class Conversation: # 6. 标记初始化成功并设置运行状态 self._initialized = True - self.should_continue = True # 初始化成功,标记可以继续运行循环 - self.state = ConversationState.ANALYZING # 设置初始状态为分析 + self.should_continue = True # 初始化成功,标记可以继续运行循环 + self.state = ConversationState.ANALYZING # 设置初始状态为分析 logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。") @@ -171,11 +180,11 @@ class Conversation: # 捕获初始化过程中的任何异常 logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - self.should_continue = False # 初始化失败,标记不能继续 - self._initialized = False # 确保标记为未初始化 + self.should_continue = False # 初始化失败,标记不能继续 + self._initialized = False # 确保标记为未初始化 # 尝试停止可能部分启动的组件 await self.stop() - raise # 将异常重新抛出,通知调用者初始化失败 + raise # 将异常重新抛出,通知调用者初始化失败 finally: # 无论成功与否,都要清除正在初始化的标记 self._initializing = False @@ -192,7 +201,7 @@ class Conversation: initial_messages = get_raw_msg_before_timestamp_with_chat( chat_id=self.stream_id, timestamp=time.time(), - limit=30, # limit 可以根据需要调整或配置 + limit=30, # limit 可以根据需要调整或配置 ) if initial_messages: @@ -211,7 +220,9 @@ class Conversation: try: last_user_info = UserInfo.from_dict(last_user_info_dict) # 存储发送者的 user_id 字符串 - self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None + self.observation_info.last_message_sender = ( + str(last_user_info.user_id) if last_user_info else None + ) except Exception as e: logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}") self.observation_info.last_message_sender = None @@ -223,13 +234,13 @@ class Conversation: self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") # 构建用于 Prompt 的历史记录字符串 (只使用最近的一部分) - history_slice_for_str = initial_messages[-20:] # 可配置 + history_slice_for_str = initial_messages[-20:] # 可配置 self.observation_info.chat_history_str = await build_readable_messages( history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0 # read_mark 可能需要根据实际情况调整 + read_mark=0.0, # read_mark 可能需要根据实际情况调整 ) # 更新 ChatObserver 和 IdleStarter 的时间戳 @@ -238,13 +249,17 @@ class Conversation: self.chat_observer.last_message_time = self.observation_info.last_message_time if self.idle_conversation_starter and self.observation_info.last_message_time: # 更新空闲计时器的起始时间 - await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) + await self.idle_conversation_starter.update_last_message_time( + self.observation_info.last_message_time + ) - logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}") + logger.info( + f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}" + ) else: # 如果没有历史记录 logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。") - self.observation_info.chat_history_str = "还没有聊天记录。" # 设置默认提示 + self.observation_info.chat_history_str = "还没有聊天记录。" # 设置默认提示 except Exception as load_err: # 捕获加载过程中的异常 @@ -267,14 +282,16 @@ class Conversation: # 在尝试初始化后,再次检查状态 if not self._initialized: logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。") - return # 初始化失败,明确停止 + return # 初始化失败,明确停止 except Exception as init_err: logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。") - return # 初始化异常,明确停止 + return # 初始化异常,明确停止 # 再次检查 should_continue 标志,确保初始化成功且未被外部停止 if not self.should_continue: - logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。") + logger.warning( + f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。" + ) return logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...") @@ -297,7 +314,7 @@ class Conversation: 会停止后台任务、解绑观察者等。 """ logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}") - self.should_continue = False # 设置标志,让主循环退出 + self.should_continue = False # 设置标志,让主循环退出 # 停止空闲对话检测器 if self.idle_conversation_starter: @@ -324,11 +341,11 @@ class Conversation: # 循环前再次确认初始化状态 if not self._initialized: logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环,退出。") - return # 明确退出 + return # 明确退出 # 主循环,只要 should_continue 为 True 就一直运行 while self.should_continue: - loop_iter_start_time = time.time() # 记录本次循环开始时间 + loop_iter_start_time = time.time() # 记录本次循环开始时间 logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})") # --- 处理忽略状态 --- @@ -341,13 +358,13 @@ class Conversation: # 计算需要睡眠的时间,最多30秒或直到忽略结束 sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time) await asyncio.sleep(sleep_duration) - continue # 跳过本次循环的后续步骤,直接进入下一次迭代检查 + continue # 跳过本次循环的后续步骤,直接进入下一次迭代检查 elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp: # 如果忽略时间已到 logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。") - self.ignore_until_timestamp = None # 清除忽略时间戳 - await self.stop() # 调用 stop 方法来结束整个对话实例 - continue # 跳过本次循环的后续步骤 + self.ignore_until_timestamp = None # 清除忽略时间戳 + await self.stop() # 调用 stop 方法来结束整个对话实例 + continue # 跳过本次循环的后续步骤 else: # 如果不在忽略状态,确保空闲检测器在运行 if self.idle_conversation_starter and not self.idle_conversation_starter._running: @@ -360,7 +377,7 @@ class Conversation: if not all([self.action_planner, self.observation_info, self.conversation_info]): logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。将等待5秒后重试...") await asyncio.sleep(5) - continue # 跳过本次迭代 + continue # 跳过本次迭代 # 2. 记录规划开始时间并重置临时状态 planning_start_time = time.time() @@ -372,21 +389,21 @@ class Conversation: logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan...") # 传入当前观察信息、对话信息和上次成功回复的动作类型 action, reason = await self.action_planner.plan( - self.observation_info, - self.conversation_info, - self.conversation_info.last_successful_reply_action + self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) planning_duration = time.time() - planning_start_time - logger.debug(f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}") + logger.debug( + f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}" + ) # 4. 检查规划期间是否有新消息到达 - current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []) + current_unprocessed_messages = getattr(self.observation_info, "unprocessed_messages", []) new_messages_during_planning: List[Dict[str, Any]] = [] other_new_messages_during_planning: List[Dict[str, Any]] = [] # 遍历当前所有未处理的消息 for msg in current_unprocessed_messages: - msg_time = msg.get('time') + msg_time = msg.get("time") sender_id = msg.get("user_info", {}).get("user_id") # 检查消息时间是否在本次规划开始之后 if msg_time and msg_time >= planning_start_time: @@ -395,9 +412,11 @@ class Conversation: if sender_id != self.bot_qq_str: other_new_messages_during_planning.append(msg) - new_msg_count = len(new_messages_during_planning) # 规划期间所有新消息数 - other_new_msg_count = len(other_new_messages_during_planning) # 规划期间他人新消息数 - logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") + new_msg_count = len(new_messages_during_planning) # 规划期间所有新消息数 + other_new_msg_count = len(other_new_messages_during_planning) # 规划期间他人新消息数 + logger.debug( + f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}" + ) # 5. 根据动作类型和新消息数量,判断是否需要中断当前规划 should_interrupt: bool = False @@ -424,9 +443,9 @@ class Conversation: cancel_record = { "action": action, "plan_reason": reason, - "status": "cancelled_due_to_new_messages", # 标记取消原因 + "status": "cancelled_due_to_new_messages", # 标记取消原因 "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "final_reason": interrupt_reason + "final_reason": interrupt_reason, } # 安全地添加到 done_action 列表 if not hasattr(self.conversation_info, "done_action"): @@ -437,8 +456,8 @@ class Conversation: self.conversation_info.last_successful_reply_action = None # 将状态设置回分析,准备处理新消息并重新规划 self.state = ConversationState.ANALYZING - await asyncio.sleep(0.1) # 短暂等待,避免CPU空转 - continue # 直接进入下一次循环迭代 + await asyncio.sleep(0.1) # 短暂等待,避免CPU空转 + continue # 直接进入下一次循环迭代 # 7. 如果未中断,存储规划期间的他人新消息数,并执行动作 logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'...") @@ -462,32 +481,38 @@ class Conversation: goal_ended = True # 检查最后执行的动作是否是结束类型且成功完成 - last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} - action_ended: bool = (last_action_record.get("action") in ["end_conversation", "say_goodbye"] and - last_action_record.get("status") == "done") + last_action_record = ( + self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} + ) + action_ended: bool = ( + last_action_record.get("action") in ["end_conversation", "say_goodbye"] + and last_action_record.get("status") == "done" + ) # 如果满足任一结束条件,则停止循环 if goal_ended or action_ended: - logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。") - await self.stop() # 调用 stop 来停止实例 - continue # 跳过后续,虽然 stop 会设置 should_continue=False + logger.info( + f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。" + ) + await self.stop() # 调用 stop 来停止实例 + continue # 跳过后续,虽然 stop 会设置 should_continue=False except asyncio.CancelledError: # 处理任务被取消的情况 logger.info(f"[私聊][{self.private_name}] PFC 主循环任务被取消。") - await self.stop() # 确保资源被清理 - break # 明确退出循环 + await self.stop() # 确保资源被清理 + break # 明确退出循环 except Exception as loop_err: # 捕获循环中的其他未预期错误 logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - self.state = ConversationState.ERROR # 设置错误状态 + self.state = ConversationState.ERROR # 设置错误状态 # 可以在这里添加更复杂的错误恢复逻辑,或者简单等待后重试 - await asyncio.sleep(5) # 等待一段时间,避免错误状态下快速空转 + await asyncio.sleep(5) # 等待一段时间,避免错误状态下快速空转 # --- 控制循环频率 --- - loop_duration = time.time() - loop_iter_start_time # 计算本次循环耗时 - min_loop_interval = 0.1 # 设置最小循环间隔(秒),防止CPU占用过高 + loop_duration = time.time() - loop_iter_start_time # 计算本次循环耗时 + min_loop_interval = 0.1 # 设置最小循环间隔(秒),防止CPU占用过高 logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。") if loop_duration < min_loop_interval: # 如果循环太快,则睡眠一段时间 @@ -496,15 +521,16 @@ class Conversation: # 循环结束后的日志 logger.info(f"[私聊][{self.private_name}] PFC 循环已退出 for stream_id: {self.stream_id}") - def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]: """将从数据库或其他来源获取的消息字典转换为内部使用的 Message 对象""" try: # 优先使用实例自身的 chat_stream,如果不存在则尝试从管理器获取 chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id) if not chat_stream_to_use: - logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。") - return None # 无法确定聊天流,返回 None + logger.error( + f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。" + ) + return None # 无法确定聊天流,返回 None # 解析用户信息字典 user_info_dict = msg_dict.get("user_info", {}) @@ -515,34 +541,33 @@ class Conversation: user_info = UserInfo.from_dict(user_info_dict) except Exception as e: # 解析失败记录警告 - logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") + logger.warning( + f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}" + ) if not user_info: # 如果没有有效的 UserInfo,记录警告并返回 None - logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo,无法转换。 msg_id: {msg_dict.get('message_id')}") + logger.warning( + f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo,无法转换。 msg_id: {msg_dict.get('message_id')}" + ) return None # 创建并返回 Message 对象 return Message( - message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 如果没有ID,生成一个临时的 + message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 如果没有ID,生成一个临时的 chat_stream=chat_stream_to_use, - time=msg_dict.get("time", time.time()), # 如果没有时间戳,使用当前时间 - user_info=user_info, # 使用解析出的 UserInfo 对象 - processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本 - detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本 + time=msg_dict.get("time", time.time()), # 如果没有时间戳,使用当前时间 + user_info=user_info, # 使用解析出的 UserInfo 对象 + processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本 + detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本 ) except Exception as e: # 捕获转换过程中的任何异常 logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - return None # 转换失败返回 None - + return None # 转换失败返回 None async def _handle_action( - self, - action: str, - reason: str, - observation_info: ObservationInfo, - conversation_info: ConversationInfo + self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo ): """ 处理由 ActionPlanner 规划出的具体行动。 @@ -555,15 +580,15 @@ class Conversation: return logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") - action_start_time = time.time() # 记录动作开始时间 + action_start_time = time.time() # 记录动作开始时间 # --- 准备动作历史记录条目 --- current_action_record = { "action": action, - "plan_reason": reason, # 记录规划时的原因 - "status": "start", # 初始状态为“开始” - "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间 - "final_reason": None, # 最终结果的原因,将在 finally 中设置 + "plan_reason": reason, # 记录规划时的原因 + "status": "start", # 初始状态为“开始” + "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间 + "final_reason": None, # 最终结果的原因,将在 finally 中设置 } # 安全地添加到历史记录列表 if not hasattr(conversation_info, "done_action"): @@ -573,21 +598,21 @@ class Conversation: action_index = len(conversation_info.done_action) - 1 # --- 初始化动作执行状态变量 --- - action_successful: bool = False # 标记动作是否成功执行 - final_status: str = "recall" # 动作最终状态,默认为 recall (表示未成功或需重试) - final_reason: str = "动作未成功执行" # 动作最终原因 - need_replan_from_checker: bool = False # 标记是否由 ReplyChecker 要求重新规划 + action_successful: bool = False # 标记动作是否成功执行 + final_status: str = "recall" # 动作最终状态,默认为 recall (表示未成功或需重试) + final_reason: str = "动作未成功执行" # 动作最终原因 + need_replan_from_checker: bool = False # 标记是否由 ReplyChecker 要求重新规划 try: # --- 根据不同的 action 类型执行相应的逻辑 --- # 1. 处理需要生成、检查、发送的动作 if action in ["direct_reply", "send_new_message"]: - max_reply_attempts: int = 3 # 最多尝试次数 (可配置) + max_reply_attempts: int = 3 # 最多尝试次数 (可配置) reply_attempt_count: int = 0 - is_suitable: bool = False # 标记回复是否合适 - generated_content: str = "" # 存储生成的回复 - check_reason: str = "未进行检查" # 存储检查结果原因 + is_suitable: bool = False # 标记回复是否合适 + generated_content: str = "" # 存储生成的回复 + check_reason: str = "未进行检查" # 存储检查结果原因 # --- [核心修复] 引入重试循环 --- while reply_attempt_count < max_reply_attempts and not is_suitable and not need_replan_from_checker: @@ -596,46 +621,44 @@ class Conversation: logger.info(log_prefix) # --- a. 生成回复 --- - self.state = ConversationState.GENERATING # 更新对话状态 + self.state = ConversationState.GENERATING # 更新对话状态 if not self.reply_generator: # 检查依赖组件是否存在 raise RuntimeError("ReplyGenerator 未初始化") # 调用 ReplyGenerator 生成回复内容 generated_content = await self.reply_generator.generate( - observation_info, - conversation_info, - action_type=action + observation_info, conversation_info, action_type=action ) - logger.info(f"{log_prefix} 生成内容: '{generated_content}...'") # 日志中截断长内容 + logger.info(f"{log_prefix} 生成内容: '{generated_content}...'") # 日志中截断长内容 # 检查生成内容是否有效 if not generated_content or generated_content.startswith("抱歉"): # 如果生成失败或返回错误提示 logger.warning(f"{log_prefix} 生成内容为空或为错误提示,将进行下一次尝试。") - check_reason = "生成内容无效" # 记录原因 + check_reason = "生成内容无效" # 记录原因 # 记录拒绝信息供下次生成参考 conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = generated_content - await asyncio.sleep(0.5) # 短暂等待后重试 - continue # 进入下一次循环尝试 + await asyncio.sleep(0.5) # 短暂等待后重试 + continue # 进入下一次循环尝试 # --- b. 检查回复 --- - self.state = ConversationState.CHECKING # 更新状态为检查中 + self.state = ConversationState.CHECKING # 更新状态为检查中 if not self.reply_checker: raise RuntimeError("ReplyChecker 未初始化") # 准备检查所需的上下文信息 - current_goal_str: str = "" # 当前对话目标字符串 + current_goal_str: str = "" # 当前对话目标字符串 if conversation_info.goal_list: # 通常检查最新的目标 goal_item = conversation_info.goal_list[-1] if isinstance(goal_item, dict): - current_goal_str = goal_item.get('goal', '') + current_goal_str = goal_item.get("goal", "") elif isinstance(goal_item, str): current_goal_str = goal_item # 获取用于检查的聊天记录 (列表和字符串形式) - chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, 'chat_history', []) - chat_history_text_for_check: str = getattr(observation_info, 'chat_history_str', '') + chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, "chat_history", []) + chat_history_text_for_check: str = getattr(observation_info, "chat_history_str", "") # 当前重试次数 (传递给 checker,可能有用) # retry_count for checker starts from 0 current_retry_for_checker = reply_attempt_count - 1 @@ -645,11 +668,13 @@ class Conversation: is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check( reply=generated_content, goal=current_goal_str, - chat_history=chat_history_for_check, # 传递列表形式的历史记录 - chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录 - retry_count=current_retry_for_checker + chat_history=chat_history_for_check, # 传递列表形式的历史记录 + chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录 + retry_count=current_retry_for_checker, + ) + logger.info( + f"{log_prefix} ReplyChecker 结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}" ) - logger.info(f"{log_prefix} ReplyChecker 结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}") # 如果不合适,记录原因并准备下一次尝试(如果还有次数) if not is_suitable: @@ -659,7 +684,7 @@ class Conversation: # 如果不需要重规划且还有尝试次数 if not need_replan_from_checker and reply_attempt_count < max_reply_attempts: logger.warning(f"{log_prefix} 回复不合适,原因: {check_reason}。将进行下一次尝试。") - await asyncio.sleep(0.5) # 等待后重试 + await asyncio.sleep(0.5) # 等待后重试 # 如果需要重规划或达到最大次数,循环会在下次判断时自动结束 # --- 循环结束后,处理最终结果 --- @@ -671,17 +696,19 @@ class Conversation: conversation_info.last_rejected_reply_content = None # --- c. 发送回复 --- - self.generated_reply = generated_content # 使用最后一次检查通过的内容 - timestamp_before_sending = time.time() # 记录发送前时间戳 - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") - self.state = ConversationState.SENDING # 更新状态为发送中 + self.generated_reply = generated_content # 使用最后一次检查通过的内容 + timestamp_before_sending = time.time() # 记录发送前时间戳 + logger.debug( + f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}" + ) + self.state = ConversationState.SENDING # 更新状态为发送中 # 调用内部发送方法 send_success = await self._send_reply() - send_end_time = time.time() # 记录发送结束时间 + send_end_time = time.time() # 记录发送结束时间 if send_success: # 如果发送成功 - action_successful = True # 标记动作成功 + action_successful = True # 标记动作成功 # final_status 和 final_reason 会在 finally 中设置 logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") # 更新空闲计时器 @@ -689,56 +716,73 @@ class Conversation: await self.idle_conversation_starter.update_last_message_time(send_end_time) # --- d. 清理已处理消息 --- - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", []) message_ids_to_clear: Set[str] = set() # 遍历所有未处理消息 for msg in current_unprocessed_messages: - msg_time = msg.get('time') - msg_id = msg.get('message_id') + msg_time = msg.get("time") + msg_id = msg.get("message_id") sender_id = msg.get("user_info", {}).get("user_id") # 规则:只清理【发送前】收到的、【来自他人】的消息 - if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: + if ( + msg_id + and msg_time + and sender_id != self.bot_qq_str + and msg_time < timestamp_before_sending + ): message_ids_to_clear.add(msg_id) # 如果有需要清理的消息,调用清理方法 if message_ids_to_clear: - logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}") + logger.debug( + f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}" + ) await observation_info.clear_processed_messages(message_ids_to_clear) else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") # --- e. 决定下一轮规划类型 --- # 从 conversation_info 获取【规划期间】收到的【他人】新消息数量 - other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) + other_new_msg_count_during_planning = getattr( + conversation_info, "other_new_messages_during_planning_count", 0 + ) # 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复 if other_new_msg_count_during_planning > 0: - logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") - conversation_info.last_successful_reply_action = None # 强制初始回复 + logger.info( + f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。" + ) + conversation_info.last_successful_reply_action = None # 强制初始回复 else: # 规则:如果规划期间【没有】收到他人新消息,则允许追问 - logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") - conversation_info.last_successful_reply_action = action # 允许追问 + logger.info( + f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。" + ) + conversation_info.last_successful_reply_action = action # 允许追问 else: # 如果发送失败 logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") - final_status = "recall" # 发送失败,标记为 recall + final_status = "recall" # 发送失败,标记为 recall final_reason = "发送回复时失败" # 重置追问状态 conversation_info.last_successful_reply_action = None elif need_replan_from_checker: # 如果 Checker 要求重新规划 - logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因 ReplyChecker 要求而被取消,将重新规划。原因: {check_reason}") - final_status = "recall" # 标记为 recall + logger.warning( + f"[私聊][{self.private_name}] 动作 '{action}' 因 ReplyChecker 要求而被取消,将重新规划。原因: {check_reason}" + ) + final_status = "recall" # 标记为 recall final_reason = f"回复检查要求重新规划: {check_reason}" # # 重置追问状态 # conversation_info.last_successful_reply_action = None else: # 达到最大尝试次数仍未找到合适回复 - logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 达到最大尝试次数 ({max_reply_attempts}),未能生成/检查通过合适的回复。最终原因: {check_reason}") - final_status = "recall" # 标记为 recall + logger.warning( + f"[私聊][{self.private_name}] 动作 '{action}': 达到最大尝试次数 ({max_reply_attempts}),未能生成/检查通过合适的回复。最终原因: {check_reason}" + ) + final_status = "recall" # 标记为 recall final_reason = f"尝试{max_reply_attempts}次后失败: {check_reason}" # # 重置追问状态 # conversation_info.last_successful_reply_action = None @@ -750,9 +794,7 @@ class Conversation: raise RuntimeError("ReplyGenerator 未初始化") # 生成告别语 generated_content = await self.reply_generator.generate( - observation_info, - conversation_info, - action_type=action + observation_info, conversation_info, action_type=action ) logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") @@ -761,33 +803,40 @@ class Conversation: logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。") final_reason = "生成内容无效" # 即使生成失败,也按计划结束对话 - final_status = "done" # 标记为 done,因为目的是结束 + final_status = "done" # 标记为 done,因为目的是结束 self.should_continue = False logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") else: # 发送告别语 self.generated_reply = generated_content timestamp_before_sending = time.time() - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") + logger.debug( + f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}" + ) self.state = ConversationState.SENDING send_success = await self._send_reply() send_end_time = time.time() if send_success: - action_successful = True # 标记成功 + action_successful = True # 标记成功 # final_status 和 final_reason 会在 finally 中设置 logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") # 更新空闲计时器 if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time) # 清理发送前的消息 (虽然通常是最后一条,但保持逻辑一致) - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", []) message_ids_to_clear: Set[str] = set() for msg in current_unprocessed_messages: - msg_time = msg.get('time') - msg_id = msg.get('message_id') + msg_time = msg.get("time") + msg_id = msg.get("message_id") sender_id = msg.get("user_info", {}).get("user_id") - if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: + if ( + msg_id + and msg_time + and sender_id != self.bot_qq_str + and msg_time < timestamp_before_sending + ): message_ids_to_clear.add(msg_id) if message_ids_to_clear: await observation_info.clear_processed_messages(message_ids_to_clear) @@ -808,7 +857,7 @@ class Conversation: raise RuntimeError("GoalAnalyzer 未初始化") # 调用 GoalAnalyzer 分析并更新目标 await self.goal_analyzer.analyze_goal(conversation_info, observation_info) - action_successful = True # 标记成功 + action_successful = True # 标记成功 # 4. 处理倾听动作 elif action == "listening": @@ -818,22 +867,24 @@ class Conversation: logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...") # 调用 Waiter 的倾听等待方法,内部会处理超时 await self.waiter.wait_listening(conversation_info) - action_successful = True # listening 动作本身执行即视为成功,后续由新消息或超时驱动 + action_successful = True # listening 动作本身执行即视为成功,后续由新消息或超时驱动 # 5. 处理结束对话动作 elif action == "end_conversation": logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...") - action_successful = True # 标记成功 - self.should_continue = False # 设置标志以退出循环 + action_successful = True # 标记成功 + self.should_continue = False # 设置标志以退出循环 # 6. 处理屏蔽忽略动作 elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...") - ignore_duration_seconds = 10 * 60 # 忽略 10 分钟,可配置 + ignore_duration_seconds = 10 * 60 # 忽略 10 分钟,可配置 self.ignore_until_timestamp = time.time() + ignore_duration_seconds - logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}") - self.state = ConversationState.IGNORED # 设置忽略状态 - action_successful = True # 标记成功 + logger.info( + f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" + ) + self.state = ConversationState.IGNORED # 设置忽略状态 + action_successful = True # 标记成功 # 7. 处理等待动作 elif action == "wait": @@ -844,14 +895,14 @@ class Conversation: # 调用 Waiter 的常规等待方法,内部处理超时 # wait 方法返回是否超时 (True=超时, False=未超时/被新消息中断) timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True # wait 动作本身执行即视为成功 + action_successful = True # wait 动作本身执行即视为成功 # wait 动作完成后不需要清理消息,等待新消息或超时触发重新规划 logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。") # 8. 处理未知的动作类型 else: logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}") - final_status = "recall" # 未知动作标记为 recall + final_status = "recall" # 未知动作标记为 recall final_reason = f"未知的动作类型: {action}" # --- 重置非回复动作的追问状态 --- @@ -869,14 +920,14 @@ class Conversation: final_reason = "动作处理被取消" # 取消时也重置追问状态 conversation_info.last_successful_reply_action = None - raise # 重新抛出 CancelledError,让上层知道任务被取消 + raise # 重新抛出 CancelledError,让上层知道任务被取消 except Exception as handle_err: # 捕获处理动作过程中的其他所有异常 logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - final_status = "error" # 标记为错误状态 + final_status = "error" # 标记为错误状态 final_reason = f"处理动作时出错: {handle_err}" - self.state = ConversationState.ERROR # 设置对话状态为错误 + self.state = ConversationState.ERROR # 设置对话状态为错误 # 出错时重置追问状态 conversation_info.last_successful_reply_action = None @@ -895,7 +946,11 @@ class Conversation: if action == "wait": # 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list) # 这里简化处理,直接使用通用成功原因 - timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False + timeout_occurred = ( + any("分钟," in g.get("goal", "") for g in conversation_info.goal_list if isinstance(g, dict)) + if conversation_info.goal_list + else False + ) final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") elif action == "listening": final_reason = "进入倾听状态" @@ -917,25 +972,26 @@ class Conversation: if checker_reason: final_reason = f"回复检查不通过: {checker_reason}" else: - final_reason = "动作执行失败或被取消" # 通用失败原因 + final_reason = "动作执行失败或被取消" # 通用失败原因 # 更新历史记录字典 if conversation_info.done_action and action_index < len(conversation_info.done_action): # 使用 update 方法更新字典,更安全 conversation_info.done_action[action_index].update( { - "status": final_status, # 最终状态 - "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间 - "final_reason": final_reason, # 最终原因 - "duration_ms": int((time.time() - action_start_time) * 1000) # 记录耗时(毫秒) + "status": final_status, # 最终状态 + "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间 + "final_reason": final_reason, # 最终原因 + "duration_ms": int((time.time() - action_start_time) * 1000), # 记录耗时(毫秒) } ) - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") + logger.debug( + f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}" + ) else: # 如果索引无效或列表为空,记录错误 logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。") - async def _send_reply(self) -> bool: """发送 `self.generated_reply` 中的内容到聊天流""" # 检查是否有内容可发送 @@ -956,17 +1012,17 @@ class Conversation: await self.direct_sender.send_message( chat_stream=self.chat_stream, content=reply_content, - reply_to_message=None # 私聊通常不需要引用回复 + reply_to_message=None, # 私聊通常不需要引用回复 ) # 发送成功后,将状态设置回分析,准备下一轮规划 self.state = ConversationState.ANALYZING - return True # 返回成功 + return True # 返回成功 except Exception as e: # 捕获发送过程中的异常 logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - self.state = ConversationState.ERROR # 发送失败标记错误状态 - return False # 返回失败 + self.state = ConversationState.ERROR # 发送失败标记错误状态 + return False # 返回失败 async def _send_timeout_message(self): """在等待超时后发送一条结束消息""" @@ -979,9 +1035,7 @@ class Conversation: timeout_content = "我们好像很久没说话了,先这样吧~" # 发送超时消息 await self.direct_sender.send_message( - chat_stream=self.chat_stream, - content=timeout_content, - reply_to_message=None + chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None ) logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。") # 发送超时消息后,通常意味着对话结束,调用 stop @@ -989,4 +1043,3 @@ class Conversation: except Exception as e: # 捕获发送超时消息的异常 logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") -