From dfa947f7aaad983dde918517ee0fa7827295472b Mon Sep 17 00:00:00 2001 From: Bakadax Date: Mon, 5 May 2025 17:52:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 198 ++++++++++++-------------------- 1 file changed, 75 insertions(+), 123 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 18fcfe0c..6cd2be8b 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -86,6 +86,10 @@ class Conversation: self.observation_info.bind_to_chat_observer(self.chat_observer) self.conversation_info = ConversationInfo() + # --- 初始化上次拒绝回复的信息 --- + self.conversation_info.last_reply_rejection_reason = None + self.conversation_info.last_rejected_reply_content = None + except Exception as e: logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") @@ -164,40 +168,31 @@ class Conversation: continue try: # --- 记录规划开始时的时间戳和未处理消息数 --- - # 使用 time.time() 获取当前时间戳作为标记点,更准确反映规划开始的时刻 planning_marker_time = time.time() initial_unprocessed_count = self.observation_info.new_messages_count logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}") - # --- 调用 Action Planner --- - # 传递 self.conversation_info.last_successful_reply_action action, reason = await self.action_planner.plan( self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) # --- 规划后检查是否有 *过多* 新消息到达 --- - # 检查规划期间(调用 plan 函数的时间段内)新收到的消息数 current_unprocessed_count = self.observation_info.new_messages_count - # planning_buffer = 2 # 用户指定的缓冲值 - planning_buffer = 2 # 使用用户指定的缓冲值 + planning_buffer = 2 # 用户指定的缓冲值 new_messages_during_planning = current_unprocessed_count - initial_unprocessed_count if new_messages_during_planning > planning_buffer: logger.info( f"[私聊][{self.private_name}]规划期间收到 {new_messages_during_planning} 条新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划" ) - # 重置上次成功回复状态,因为要响应新消息 self.conversation_info.last_successful_reply_action = None - # 记录被放弃的规划动作 (可选) - # current_action_record = { ... status: "recalled_before_execution" ... } - # conversation_info.done_action.append(current_action_record) - await asyncio.sleep(0.1) # 短暂等待 + await asyncio.sleep(0.1) continue # 重新进入循环进行规划 # --- 如果规划期间新消息未超限,则继续执行规划的动作 --- - # 将 planning_marker_time 传递给 _handle_action - await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time) + # 将 planning_marker_time 和 new_messages_during_planning 传递给 _handle_action + await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_messages_during_planning) # 检查是否需要结束对话 (逻辑不变) goal_ended = False @@ -220,30 +215,23 @@ class Conversation: except Exception as loop_err: logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - await asyncio.sleep(1) # 发生错误时等待一段时间 + await asyncio.sleep(1) if self.should_continue: - await asyncio.sleep(0.1) # 保持循环间的短暂间隔 + await asyncio.sleep(0.1) logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") - # --- 移除 _check_new_messages_during_action 方法 --- - # 这个方法不再需要,因为检查逻辑已合并到 _plan_and_action_loop 中, - # 并且 _handle_action 内部不再需要打断执行。 - # def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool: - # # ... (旧代码) ... - # pass def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: """将消息字典转换为Message对象""" try: - # 尝试从 msg_dict 直接获取 chat_stream,如果失败则从全局 chat_manager 获取 chat_info = msg_dict.get("chat_info") if chat_info and isinstance(chat_info, dict): chat_stream = ChatStream.from_dict(chat_info) - elif self.chat_stream: # 使用实例变量中的 chat_stream + elif self.chat_stream: chat_stream = self.chat_stream - else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id) + else: chat_stream = chat_manager.get_stream(self.stream_id) if not chat_stream: raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}") @@ -253,32 +241,29 @@ class Conversation: user_info = UserInfo.from_dict(user_info_dict) else: logger.warning(f"Message user_info is not a dict: {user_info_dict}") - # 根据需要返回默认 UserInfo 或抛出错误 user_info = UserInfo(user_id="unknown", user_nickname="Unknown", platform="unknown") - return Message( - message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID - chat_stream=chat_stream, # 使用确定的 chat_stream - time=msg_dict.get("time", time.time()), # 提供默认时间 + message_id=msg_dict.get("message_id", f"gen_{time.time()}"), + chat_stream=chat_stream, + time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", ""), ) except Exception as e: logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}") - # 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题 raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e - # --- 修改:_handle_action 接收 planning_marker_time --- + # --- 修改:_handle_action 接收 planning_marker_time 和 new_messages_during_planning --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float + self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_messages_during_planning: int ): """处理规划的行动""" logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}") - # 记录action历史 (逻辑不变) + # 记录action历史 current_action_record = { "action": action, "plan_reason": reason, @@ -286,13 +271,12 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), "final_reason": None, } - # 确保 done_action 列表存在 if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) action_index = len(conversation_info.done_action) - 1 - action_successful = False # 用于标记动作是否成功完成 + action_successful = False # --- 根据不同的 action 执行 --- @@ -311,10 +295,7 @@ class Conversation: ) self.state = ConversationState.GENERATING - # --- 移除生成前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - - # 1. 生成回复 (调用 generate 时传入 action_type) + # 1. 生成回复 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="send_new_message" ) @@ -322,13 +303,10 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" ) - # --- 移除检查前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - - # 2. 检查回复 (逻辑不变) + # 2. 检查回复 self.state = ConversationState.CHECKING try: - current_goal_str = "" # 初始化 + current_goal_str = "" if conversation_info.goal_list: first_goal = conversation_info.goal_list[0] if isinstance(first_goal, dict): @@ -347,13 +325,10 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) - # 记录失败原因和内容到 conversation_info if not is_suitable: - # 确保属性存在 setattr(conversation_info, 'last_reply_rejection_reason', check_reason) setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) else: - # 清除上次失败记录 setattr(conversation_info, 'last_reply_rejection_reason', None) setattr(conversation_info, 'last_rejected_reply_content', None) @@ -364,45 +339,46 @@ class Conversation: logger.warning( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" ) - break # 跳出循环,后续会处理 need_replan + break except Exception as check_err: logger.error( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - # 记录失败 setattr(conversation_info, 'last_reply_rejection_reason', check_reason) setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) - break # 出错也跳出循环 + break # 循环结束,处理最终结果 if is_suitable: - # --- 移除发送前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - # 发送合适的回复 self.generated_reply = final_reply_to_send - send_success = await self._send_reply() # 调用发送函数,获取发送结果 + send_success = await self._send_reply() if send_success: - # --- 发送成功后,标记处理过的消息 --- - # 使用 planning_marker_time 标记规划开始前的消息为已处理 + # 发送成功后,标记处理过的消息 await observation_info.mark_messages_processed_up_to(planning_marker_time) - # 更新状态: 标记上次成功是 send_new_message - self.conversation_info.last_successful_reply_action = "send_new_message" - action_successful = True # 标记动作成功 + + # --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 --- + if new_messages_during_planning > 0: + logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_messages_during_planning} 条新消息,强制重置回复状态以进行新规划。") + self.conversation_info.last_successful_reply_action = None # 强制重新规划 + else: + # 只有在规划期间没有新消息时,才设置追问状态 + logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无新消息,允许下次进入追问状态。") + self.conversation_info.last_successful_reply_action = "send_new_message" + # --- 核心逻辑修改结束 --- + + action_successful = True else: - # 发送失败处理 logger.error(f"[私聊][{self.private_name}]发送追问回复失败") - # 确保 action_index 有效 if action_index < len(conversation_info.done_action): conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"} ) - self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态 + self.conversation_info.last_successful_reply_action = None elif need_replan: - # 打回动作决策 logger.warning( f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" ) @@ -410,10 +386,9 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} ) - self.conversation_info.last_successful_reply_action = None # 重置成功状态 + self.conversation_info.last_successful_reply_action = None else: - # 追问失败 logger.warning( f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}" ) @@ -426,7 +401,6 @@ class Conversation: # 执行 Wait 操作 logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") self.state = ConversationState.WAITING - # --- Wait 操作也需要标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait(self.conversation_info) wait_action_record = { @@ -437,7 +411,7 @@ class Conversation: "final_reason": None, } conversation_info.done_action.append(wait_action_record) - action_successful = True # Wait 本身算成功完成 + action_successful = True elif action == "direct_reply": max_reply_attempts = 3 @@ -454,9 +428,6 @@ class Conversation: ) self.state = ConversationState.GENERATING - # --- 移除生成前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - # 1. 生成回复 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="direct_reply" @@ -465,13 +436,10 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" ) - # --- 移除检查前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - # 2. 检查回复 self.state = ConversationState.CHECKING try: - current_goal_str = "" # 初始化 + current_goal_str = "" if conversation_info.goal_list: first_goal = conversation_info.goal_list[0] if isinstance(first_goal, dict): @@ -490,7 +458,6 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) - # 记录失败原因和内容 if not is_suitable: setattr(conversation_info, 'last_reply_rejection_reason', check_reason) setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) @@ -505,43 +472,46 @@ class Conversation: logger.warning( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" ) - break # 跳出循环 + break except Exception as check_err: logger.error( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - # 记录失败 setattr(conversation_info, 'last_reply_rejection_reason', check_reason) setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply) - break # 出错也跳出循环 + break # 循环结束,处理最终结果 if is_suitable: - # --- 移除发送前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - # 发送合适的回复 self.generated_reply = final_reply_to_send - send_success = await self._send_reply() # 调用发送函数 + send_success = await self._send_reply() if send_success: - # --- 发送成功后,标记处理过的消息 --- + # 发送成功后,标记处理过的消息 await observation_info.mark_messages_processed_up_to(planning_marker_time) - # 更新状态: 标记上次成功是 direct_reply - self.conversation_info.last_successful_reply_action = "direct_reply" - action_successful = True # 标记动作成功 + + # --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 --- + if new_messages_during_planning > 0: + logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_messages_during_planning} 条新消息,强制重置回复状态以进行新规划。") + self.conversation_info.last_successful_reply_action = None # 强制重新规划 + else: + # 只有在规划期间没有新消息时,才设置追问状态 + logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无新消息,允许下次进入追问状态。") + self.conversation_info.last_successful_reply_action = "direct_reply" + # --- 核心逻辑修改结束 --- + + action_successful = True else: - # 发送失败处理 logger.error(f"[私聊][{self.private_name}]发送首次回复失败") if action_index < len(conversation_info.done_action): conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"} ) - self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态 + self.conversation_info.last_successful_reply_action = None elif need_replan: - # 打回动作决策 logger.warning( f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}" ) @@ -549,10 +519,9 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} ) - self.conversation_info.last_successful_reply_action = None # 重置成功状态 + self.conversation_info.last_successful_reply_action = None else: - # 首次回复失败 logger.warning( f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}" ) @@ -562,10 +531,9 @@ class Conversation: ) self.conversation_info.last_successful_reply_action = None - # 执行 Wait 操作 (保持原有逻辑) + # 执行 Wait 操作 logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") self.state = ConversationState.WAITING - # --- Wait 操作也需要标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait(self.conversation_info) wait_action_record = { @@ -576,7 +544,7 @@ class Conversation: "final_reason": None, } conversation_info.done_action.append(wait_action_record) - action_successful = True # Wait 本身算成功 + action_successful = True elif action == "rethink_goal": self.state = ConversationState.RETHINKING @@ -585,11 +553,7 @@ class Conversation: logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。") raise AttributeError("GoalAnalyzer not initialized") - # --- 移除 rethink_goal 前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - await self.goal_analyzer.analyze_goal(conversation_info, observation_info) - # --- rethink_goal 后标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True except Exception as rethink_err: @@ -608,11 +572,9 @@ class Conversation: logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。") raise AttributeError("Waiter not initialized") - # --- listening 动作开始前标记处理过的消息 --- - # 因为 listening 本身是等待行为,需要在开始等待前处理掉规划时看到的消息 await observation_info.mark_messages_processed_up_to(planning_marker_time) await self.waiter.wait_listening(conversation_info) - action_successful = True # Listening 完成就算成功 + action_successful = True except Exception as listen_err: logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") if action_index < len(conversation_info.done_action): @@ -625,20 +587,14 @@ class Conversation: self.state = ConversationState.GENERATING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: - # --- 移除告别前的检查 --- - # if self._check_new_messages_during_action(planning_marker_time): ... - - # 1. 生成告别语 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" ) logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") - # 2. 发送告别语 if self.generated_reply: send_success = await self._send_reply() if send_success: - # --- 发送成功后标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True logger.info(f"[私聊][{self.private_name}]告别语已发送。") @@ -657,7 +613,6 @@ class Conversation: {"status": "recall", "final_reason": "未能生成告别语内容"} ) - # 3. 结束对话 self.should_continue = False logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") @@ -674,7 +629,6 @@ class Conversation: elif action == "end_conversation": self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - # --- 结束对话前标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True @@ -686,7 +640,6 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED - # --- 屏蔽前标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) action_successful = True @@ -698,10 +651,9 @@ class Conversation: logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。") raise AttributeError("Waiter not initialized") - # --- Wait 开始前标记处理过的消息 --- await observation_info.mark_messages_processed_up_to(planning_marker_time) _timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True # Wait 完成就算成功 + action_successful = True except Exception as wait_err: logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") if action_index < len(conversation_info.done_action): @@ -713,14 +665,18 @@ class Conversation: # --- 更新 Action History 状态 --- if action_successful: if action_index < len(conversation_info.done_action): + # 只有在明确不需要强制重新规划时,才在非回复动作后重置状态 + if not (action in ["direct_reply", "send_new_message"] and new_messages_during_planning > 0): + if action not in ["direct_reply", "send_new_message"]: + self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( { "status": "done", "time": datetime.datetime.now().strftime("%H:%M:%S"), } ) - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None + else: logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}") @@ -728,32 +684,28 @@ class Conversation: """发送回复,并返回发送是否成功""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return False # 发送失败 + return False try: reply_content = self.generated_reply if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return False # 发送失败 + return False if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return False # 发送失败 + return False await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) - # 触发 observer 更新的逻辑保持注释,依赖自动轮询 - # self.chat_observer.trigger_update() - # await self.chat_observer.wait_for_update() - self.state = ConversationState.ANALYZING - return True # 发送成功 + return True except Exception as e: logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") self.state = ConversationState.ANALYZING - return False # 发送失败 + return False async def _send_timeout_message(self): """发送超时结束消息"""