From 7a226ccb183267a76500262031efdf5dbe20e2a6 Mon Sep 17 00:00:00 2001 From: Bakadax Date: Fri, 9 May 2025 10:59:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=B0=83=E8=AF=95=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation_loop.py | 185 +++++++++++---------------- 1 file changed, 76 insertions(+), 109 deletions(-) diff --git a/src/plugins/PFC/conversation_loop.py b/src/plugins/PFC/conversation_loop.py index 0bc43b3f..d40b02a3 100644 --- a/src/plugins/PFC/conversation_loop.py +++ b/src/plugins/PFC/conversation_loop.py @@ -15,7 +15,7 @@ if TYPE_CHECKING: logger = get_logger("pfc_loop") -# 时区配置 (从 conversation.py 移过来,或者考虑放到更全局的配置模块) +# 时区配置 configured_tz = getattr(global_config, "TIME_ZONE", "Asia/Shanghai") TIME_ZONE = tz.gettz(configured_tz) if TIME_ZONE is None: @@ -26,7 +26,6 @@ if TIME_ZONE is None: async def run_conversation_loop(conversation_instance: "Conversation"): """ 核心的规划与行动循环 (PFC Loop)。 - 之前是 Conversation 类中的 _plan_and_action_loop 方法。 """ logger.debug(f"[私聊][{conversation_instance.private_name}] 进入 run_conversation_loop 循环。") @@ -34,16 +33,22 @@ async def run_conversation_loop(conversation_instance: "Conversation"): logger.error(f"[私聊][{conversation_instance.private_name}] 尝试在未初始化状态下运行规划循环,退出。") return - force_reflect_and_act = False # 用于强制使用反思 prompt 的标志 + # 注意:force_reflect_and_act 是在主循环的每次迭代开始前确定的, + # 它在 action_planner.plan 使用后会被重置为 False。 + # 如果在一次迭代中被设为 True (例如因为LLM任务中断),它会影响下一次迭代的 plan 调用。 + _force_reflect_and_act_next_iter = False while conversation_instance.should_continue: loop_iter_start_time = time.time() - logger.debug(f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})") + current_force_reflect_and_act = _force_reflect_and_act_next_iter + _force_reflect_and_act_next_iter = False # 默认下一轮迭代不强制反思 + + logger.debug(f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f}), force_reflect_next_iter: {current_force_reflect_and_act}") # 更新当前时间 try: - global TIME_ZONE # 引用全局 TIME_ZONE - if TIME_ZONE is None: # 如果还未加载成功 + global TIME_ZONE + if TIME_ZONE is None: configured_tz_loop = getattr(global_config, "TIME_ZONE", "Asia/Shanghai") TIME_ZONE = tz.gettz(configured_tz_loop) if TIME_ZONE is None: @@ -54,7 +59,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"): if conversation_instance.observation_info: time_str = current_time_dt.strftime("%Y-%m-%d %H:%M:%S %Z%z") conversation_instance.observation_info.current_time_str = time_str - logger.debug(f"[私聊][{conversation_instance.private_name}] 更新 ObservationInfo 当前时间: {time_str}") else: logger.warning( f"[私聊][{conversation_instance.private_name}] ObservationInfo 未初始化,无法更新当前时间。" @@ -89,7 +93,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"): # 核心规划与行动逻辑 try: - # 更新关系和情绪文本 (在每次循环开始时进行) if conversation_instance.conversation_info and conversation_instance._initialized: if ( conversation_instance.conversation_info.person_id @@ -119,7 +122,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"): conversation_instance.mood_mng.get_prompt() ) - # 检查核心组件 if not all( [ conversation_instance.action_planner, @@ -133,8 +135,7 @@ async def run_conversation_loop(conversation_instance: "Conversation"): await asyncio.sleep(5) continue - # 规划 - planning_start_time = time.time() # 这是 ActionPlanner.plan 开始的时间 + planning_start_time = time.time() logger.debug( f"[私聊][{conversation_instance.private_name}] --- (Loop) 开始规划 ({planning_start_time:.2f}) ---" ) @@ -147,15 +148,14 @@ async def run_conversation_loop(conversation_instance: "Conversation"): conversation_instance.conversation_info.last_successful_reply_action if conversation_instance.conversation_info else None, - use_reflect_prompt=force_reflect_and_act, + use_reflect_prompt=current_force_reflect_and_act, # 使用当前迭代的强制反思标志 ) - force_reflect_and_act = False # 重置反思标志 + # current_force_reflect_and_act 已经被用于本次plan, _force_reflect_and_act_next_iter 默认为 False + logger.debug( f"[私聊][{conversation_instance.private_name}] (Loop) ActionPlanner.plan 完成,初步规划动作: {action}" ) - # 检查在 ActionPlanner.plan 期间是否有中断 (这部分逻辑保持不变) - # 注意:这里的 planning_start_time 是 action_planner.plan 开始的时间 current_unprocessed_messages_after_plan = getattr(conversation_instance.observation_info, "unprocessed_messages", []) new_messages_during_action_planning: List[Dict[str, Any]] = [] other_new_messages_during_action_planning: List[Dict[str, Any]] = [] @@ -164,7 +164,7 @@ async def run_conversation_loop(conversation_instance: "Conversation"): msg_time_ap = msg_ap.get("time") sender_id_info_ap = msg_ap.get("user_info", {}) sender_id_ap = str(sender_id_info_ap.get("user_id")) if sender_id_info_ap else None - if msg_time_ap and msg_time_ap >= planning_start_time: # 使用 action_planner.plan 的开始时间 + if msg_time_ap and msg_time_ap >= planning_start_time: new_messages_during_action_planning.append(msg_ap) if sender_id_ap != conversation_instance.bot_qq_str: other_new_messages_during_action_planning.append(msg_ap) @@ -172,12 +172,8 @@ async def run_conversation_loop(conversation_instance: "Conversation"): new_msg_count_action_planning = len(new_messages_during_action_planning) other_new_msg_count_action_planning = len(other_new_messages_during_action_planning) - # 更新因 ActionPlanner.plan 期间新消息而产生的计数和状态 (这部分逻辑也基本保持) if conversation_instance.conversation_info and other_new_msg_count_action_planning > 0: - # (如果需要,这里可以更新实例消息计数、关系、情绪等,但通常这些在消息实际处理后更新更合适) - # conversation_instance.conversation_info.current_instance_message_count += other_new_msg_count_action_planning - pass - + pass # 计数更新等通常在消息实际处理后 should_interrupt_action_planning: bool = False interrupt_reason_action_planning: str = "" @@ -192,7 +188,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"): logger.info( f"[私聊][{conversation_instance.private_name}] (Loop) 中断 '{action}' (在ActionPlanner.plan后),原因: {interrupt_reason_action_planning}。重新规划..." ) - # 记录中断的动作 cancel_record_ap = { "action": action, "plan_reason": reason, @@ -206,21 +201,16 @@ async def run_conversation_loop(conversation_instance: "Conversation"): conversation_instance.conversation_info.done_action.append(cancel_record_ap) conversation_instance.conversation_info.last_successful_reply_action = None conversation_instance.state = ConversationState.ANALYZING - await asyncio.sleep(0.1) # 短暂休眠再开始下一轮 - continue # 跳过本轮的 actions.handle_action,直接进入下一轮循环重新规划 + await asyncio.sleep(0.1) + continue - # 如果 ActionPlanner.plan 后没有中断,则准备执行动作 - # 【核心修改点】对于需要LLM生成回复的动作,进行特殊处理 if action in ["direct_reply", "send_new_message"]: logger.debug( f"[私聊][{conversation_instance.private_name}] (Loop) 动作 '{action}' 需要LLM生成,进入监控执行模式..." ) - llm_call_start_time = time.time() # LLM实际调用开始的时间 + llm_call_start_time = time.time() - # 将 conversation_info 中用于 action_planner 中断的计数值传递或更新,以供 handle_action 使用 - # actions.handle_action 内部可能也需要知道这些信息 if conversation_instance.conversation_info: - # 注意:这个字段可能在 actions.handle_action 中被使用和重置 conversation_instance.conversation_info.other_new_messages_during_planning_count = other_new_msg_count_action_planning llm_action_task = asyncio.create_task( @@ -234,95 +224,72 @@ async def run_conversation_loop(conversation_instance: "Conversation"): ) interrupted_during_llm = False + llm_task_cancelled_by_us = False + while not llm_action_task.done(): try: - await asyncio.wait_for(llm_action_task, timeout=5) # 每1.5秒检查一次 + await asyncio.wait_for(llm_action_task, timeout=1.5) except asyncio.TimeoutError: - # LLM任务仍在运行,检查新消息 + current_time_for_check = time.time() + logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM Monitor Timeout. llm_call_start_time: {llm_call_start_time:.2f}, current_check_time: {current_time_for_check:.2f}") + current_unprocessed_messages_during_llm = getattr(conversation_instance.observation_info, "unprocessed_messages", []) other_new_messages_this_check: List[Dict[str, Any]] = [] - # 打印调试信息,与用户提供的一致 - # print(111111111111111111111111) # 和用户调试信息一致 + logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) Checking unprocessed_messages (count: {len(current_unprocessed_messages_during_llm)}):") for msg_llm in current_unprocessed_messages_during_llm: - # print(msg_llm) # 和用户调试信息一致 msg_time_llm = msg_llm.get("time") sender_id_info_llm = msg_llm.get("user_info", {}) sender_id_llm = str(sender_id_info_llm.get("user_id")) if sender_id_info_llm else None + is_new_enough = msg_time_llm and msg_time_llm >= llm_call_start_time + is_other_sender = sender_id_llm != conversation_instance.bot_qq_str - if msg_time_llm and msg_time_llm >= llm_call_start_time: # 注意这里用 llm_call_start_time - if sender_id_llm != conversation_instance.bot_qq_str: - other_new_messages_this_check.append(msg_llm) - # print("添加成功!\n") # 和用户调试信息一致 + logger.debug(f" - Msg ID: {msg_llm.get('message_id')}, Time: {msg_time_llm:.2f if msg_time_llm else 'N/A'}, Sender: {sender_id_llm}. New enough? {is_new_enough}. Other sender? {is_other_sender}.") + + if is_new_enough and is_other_sender: + other_new_messages_this_check.append(msg_llm) - # print(other_new_messages_this_check) # 和用户调试信息一致 - # print(len(other_new_messages_this_check)) # 和用户调试信息一致 + logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) Found {len(other_new_messages_this_check)} 'other_new_messages_this_check'.") - if len(other_new_messages_this_check) > 2: # 用户的重新规划条件 + if len(other_new_messages_this_check) > 2: logger.info( - f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息,中断并重新规划。" + f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息,发起中断。" ) - llm_action_task.cancel() - interrupted_during_llm = True - - # 记录中断的动作到 history - cancel_record_llm = { - "action": action, - "plan_reason": reason, # 使用规划时得到的 reason - "status": "cancelled_due_to_new_messages_during_llm", - "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "final_reason": f"LLM处理期间收到{len(other_new_messages_this_check)}条新用户消息", - } - if conversation_instance.conversation_info: - if not hasattr(conversation_instance.conversation_info, "done_action") or conversation_instance.conversation_info.done_action is None: - conversation_instance.conversation_info.done_action = [] - conversation_instance.conversation_info.done_action.append(cancel_record_llm) - conversation_instance.conversation_info.last_successful_reply_action = None # 因为没有成功回复 - # 可以在这里也更新 current_instance_message_count 和相关情绪/关系,如果这些新消息确实是用户的 - conversation_instance.conversation_info.current_instance_message_count += len(other_new_messages_this_check) - # (此处可以添加调用关系/情绪更新的逻辑,如果需要的话) + if not llm_action_task.done(): + llm_action_task.cancel() + llm_task_cancelled_by_us = True + break # 从监控循环中断 + # except asyncio.CancelledError: # 这个由外部的 await llm_action_task 捕获 + # pass + + # 监控循环结束后,等待任务的最终结果 + try: + await llm_action_task + logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终完成 (未被取消或未发生错误)。") + except asyncio.CancelledError: + logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终确认被取消。") + interrupted_during_llm = True + # actions.handle_action 内部的finally块会更新done_action状态为cancelled + except Exception as e_llm_final: + logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务执行时发生最终错误: {e_llm_final}") + logger.error(traceback.format_exc()) + interrupted_during_llm = True + conversation_instance.state = ConversationState.ERROR + # actions.handle_action 内部的finally块会更新done_action状态为error - conversation_instance.state = ConversationState.ANALYZING # 准备重新规划 - force_reflect_and_act = True # 下一轮强制使用初始/反思型规划 - break # 跳出监控循环 - except asyncio.CancelledError: - logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务被取消。") - interrupted_during_llm = True # 标记为中断 - # conversation_instance.state 和 force_reflect_and_act 已在上面处理 cancellation 的地方设置 - break # 跳出监控循环 - except Exception as e_llm_task: - logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务执行时出错: {e_llm_task}") - logger.error(traceback.format_exc()) - interrupted_during_llm = True # 标记为中断,按错误处理 - conversation_instance.state = ConversationState.ERROR - # 记录错误的动作到 history - error_record_llm = { - "action": action, - "plan_reason": reason, - "status": "error_during_llm_action", - "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "final_reason": f"LLM动作执行时发生错误: {str(e_llm_task)}", - } - if conversation_instance.conversation_info: - if not hasattr(conversation_instance.conversation_info, "done_action") or conversation_instance.conversation_info.done_action is None: - conversation_instance.conversation_info.done_action = [] - conversation_instance.conversation_info.done_action.append(error_record_llm) - conversation_instance.conversation_info.last_successful_reply_action = None - break # 跳出监控循环 - if interrupted_during_llm: - await asyncio.sleep(0.1) # 短暂休眠 - continue # 如果LLM任务被中断,则直接进入下一轮PFC主循环以重新规划 - - # 如果LLM任务正常完成 (没有被中断或出错跳出) - # actions.handle_action 内部会处理其结果和状态更新 - logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务正常完成。") - - else: # 对于非LLM生成类的动作,直接执行 + conversation_instance.state = ConversationState.ANALYZING + # 如果是我们因为新消息主动取消的,下一轮不强制反思,正常规划 + # 如果是其他原因(例如LLM内部错误导致任务提前结束并被视为中断),也可以考虑不强制反思 + # _force_reflect_and_act_next_iter 保持其默认的 False + logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作中断,准备重新规划。llm_task_cancelled_by_us: {llm_task_cancelled_by_us}") + await asyncio.sleep(0.1) + continue + else: logger.debug( f"[私聊][{conversation_instance.private_name}] (Loop) 执行非LLM类动作 '{action}'..." ) - if conversation_instance.conversation_info: # 确保传递最新的计数值 + if conversation_instance.conversation_info: conversation_instance.conversation_info.other_new_messages_during_planning_count = other_new_msg_count_action_planning await actions.handle_action( @@ -334,18 +301,19 @@ async def run_conversation_loop(conversation_instance: "Conversation"): ) logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 非LLM类动作 '{action}' 完成。") - # 检查是否需要反思 (这部分逻辑保持不变) last_action_record = {} if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action: last_action_record = conversation_instance.conversation_info.done_action[-1] + + # 只有当 ReplyGenerator 明确决定不发送时,才强制下一轮反思 if ( last_action_record.get("action") == "send_new_message" - and last_action_record.get("status") == "done_no_reply" + and last_action_record.get("status") == "done_no_reply" ): - logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到需反思,设置标志。") - force_reflect_and_act = True + logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到 ReplyGenerator 决定不发送消息,下一轮将强制反思。") + _force_reflect_and_act_next_iter = True + - # 检查结束条件 (这部分逻辑保持不变) goal_ended: bool = False if ( conversation_instance.conversation_info @@ -361,7 +329,7 @@ async def run_conversation_loop(conversation_instance: "Conversation"): if current_goal == "结束对话": goal_ended = True - last_action_record_for_end_check = {} + last_action_record_for_end_check = {} # 重新获取最新的记录 if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action: last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1] action_ended: bool = ( @@ -382,15 +350,14 @@ async def run_conversation_loop(conversation_instance: "Conversation"): logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) PFC 主循环出错: {loop_err}") logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) {traceback.format_exc()}") conversation_instance.state = ConversationState.ERROR - await asyncio.sleep(5) # 出错后等待一段时间 + await asyncio.sleep(5) - # 控制循环频率 loop_duration = time.time() - loop_iter_start_time - min_loop_interval = 0.1 # 最小循环间隔,避免过于频繁的空转 + min_loop_interval = 0.1 logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 循环迭代耗时: {loop_duration:.3f} 秒。") if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration) logger.info( f"[私聊][{conversation_instance.private_name}] (Loop) PFC 循环已退出 for stream_id: {conversation_instance.stream_id}" - ) \ No newline at end of file + )