From 7f365e03078ee73a19cbdcee51cb5b34f6644d79 Mon Sep 17 00:00:00 2001 From: 114514 <2514624910@qq.com> Date: Fri, 16 May 2025 04:07:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86HFC=20planner=E5=AE=8C=E5=85=A8?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/experimental/Legacy_HFC/heartFC_chat.py | 319 +++++++++++++++++--- 1 file changed, 271 insertions(+), 48 deletions(-) diff --git a/src/experimental/Legacy_HFC/heartFC_chat.py b/src/experimental/Legacy_HFC/heartFC_chat.py index 8f1d4f00..165453cd 100644 --- a/src/experimental/Legacy_HFC/heartFC_chat.py +++ b/src/experimental/Legacy_HFC/heartFC_chat.py @@ -5,6 +5,7 @@ import random # <--- 添加导入 import time import re import traceback +from asyncio import Queue from collections import deque from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine @@ -247,6 +248,12 @@ class HeartFChatting: self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 self._shutting_down: bool = False # <--- 新增:关闭标志位 self._lian_xu_deng_dai_shi_jian: float = 0.0 # <--- 新增:累计等待时间 + self._planner_task: Optional[asyncio.Task] = None # 用于跟踪异步planner任务 + self._planner_input_queue: Queue = Queue(maxsize=1) # 主循环 -> Planner 的信号队列 + self._planner_output_queue: Queue = Queue(maxsize=1) # Planner -> 主循环 的结果队列 + self._planner_active: bool = False # 标记Planner是否应该运行 + self._last_planner_signal_time: float = 0.0 # 记录上次发送规划信号的时间 + self._planner_request_id_counter: int = 0 async def _initialize(self) -> bool: """ @@ -283,11 +290,23 @@ class HeartFChatting: async def start(self): """ - 启动 HeartFChatting 的主循环。 + 启动 HeartFChatting 的主循环和异步Planner循环。 注意:调用此方法前必须确保已经成功初始化。 """ logger.info(f"{self.log_prefix} 开始认真水群(HFC)...") - await self._start_loop_if_needed() + if not self._initialized: # 确保已初始化 + if not await self._initialize(): + logger.error(f"{self.log_prefix} HFC 初始化失败,无法启动。") + return + + await self._start_loop_if_needed() # 启动主循环 + + # --- 启动异步Planner循环 --- + if self._planner_task is None or self._planner_task.done(): + self._planner_active = True # 先设置active标志 + self._planner_task = asyncio.create_task(self._async_planner_loop()) + self._planner_task.add_done_callback(self._handle_planner_loop_completion) + # logger.info(f"{self.log_prefix} 异步Planner任务已启动。") # 这行日志已在 _async_planner_loop 中存在 async def _start_loop_if_needed(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -334,6 +353,24 @@ class HeartFChatting: logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") self._processing_lock.release() + # --- 新增 _handle_planner_loop_completion 方法 --- + def _handle_planner_loop_completion(self, task: asyncio.Task): + """当 _async_planner_loop 任务完成时执行的回调。""" + try: + exception = task.exception() + if exception: + logger.error(f"{self.log_prefix} 异步Planner任务异常结束: {exception}") + logger.error(traceback.format_exc()) + # else: # 任务正常结束的日志已在 _async_planner_loop 中处理 + # logger.info(f"{self.log_prefix} 异步Planner任务正常结束。") + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 异步Planner任务被取消。") + finally: + self._planner_active = False + if self._planner_task is task: + self._planner_task = None + # --- 结束新增 _handle_planner_loop_completion 方法 --- + async def _hfc_loop(self): """主循环,持续进行计划并可能回复消息,直到被外部取消。""" try: @@ -444,69 +481,140 @@ class HeartFChatting: return False async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]: - """执行规划阶段""" + """ + 执行思考,异步触发Planner,尝试短时等待Planner结果, + 若无则执行默认动作,同时允许Planner后台完成。 + """ + thinking_id_for_cycle = "" try: - # think:思考 current_mind = await self._get_submind_thinking(cycle_timers) - # 记录子思维思考内容 if self._current_cycle: self._current_cycle.set_response_info(sub_mind_thinking=current_mind) - # plan:决策 - with Timer("决策", cycle_timers): - planner_result = await self._planner(current_mind, cycle_timers) + planner_result = None + signaled_planner_this_cycle = False # 标记本周期是否已成功触发planner - # 效果不太好,还没处理replan导致观察时间点改变的问题 + self._planner_request_id_counter += 1 + current_planner_request_id = f"planner_req_{self.stream_id}_{self._cycle_counter}_{self._planner_request_id_counter}" - # action = planner_result.get("action", "error") - # reasoning = planner_result.get("reasoning", "未提供理由") + min_planner_interval = 1.0 + can_signal_planner = (time.time() - self._last_planner_signal_time) >= min_planner_interval - # self._current_cycle.set_action_info(action, reasoning, False) + if self._planner_active and self._planner_input_queue.empty() and can_signal_planner: + logger.debug(f"{self.log_prefix} 向异步Planner发送规划请求 (ID: {current_planner_request_id})...") + # --- 修改:在上下文中加入请求ID --- + planner_input_context = { + "current_mind": current_mind, + "request_id": current_planner_request_id # <<< 新增 + } + try: + self._planner_input_queue.put_nowait(planner_input_context) + self._last_planner_signal_time = time.time() + signaled_planner_this_cycle = True + except asyncio.QueueFull: + logger.warning(f"{self.log_prefix} Planner输入队列已满,本次规划跳过。") + # 不再直接设定 planner_result,让后续逻辑决定默认行为 + # 其他条件导致未发送信号的日志 (保持或根据需要调整) + elif not self._planner_active: + logger.warning(f"{self.log_prefix} Planner未激活,无法发送规划请求。") + elif not can_signal_planner: + logger.debug(f"{self.log_prefix} Planner请求过于频繁,本次跳过。") + else: # Planner激活但输入队列非空 + logger.warning(f"{self.log_prefix} Planner输入队列仍有任务,本次跳过。") - # 在获取规划结果后检查新消息 - # if await self._check_new_messages(planner_start_db_time): - # if random.random() < 0.2: - # logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...") - # # 重新规划 - # with Timer("重新决策", cycle_timers): - # self._current_cycle.replanned = True - # planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True) - # logger.info(f"{self.log_prefix} 重新规划完成.") + # --- 尝试在短时间内获取Planner的决策结果 --- + quick_wait_timeout = 3.0 - # 解析规划结果 - action = planner_result.get("action", "error") + if signaled_planner_this_cycle: + logger.debug(f"{self.log_prefix} 短时等待 ({quick_wait_timeout}s) Planner结果 (ID: {current_planner_request_id})...") + try: + # --- 从队列获取的结果现在应该也包含 request_id --- + raw_planner_output = await asyncio.wait_for(self._planner_output_queue.get(), timeout=quick_wait_timeout) + self._planner_output_queue.task_done() # 提到前面,获取后就标记完成 + + # --- 校验 request_id --- + if isinstance(raw_planner_output, dict) and raw_planner_output.get("request_id") == current_planner_request_id: + planner_result = raw_planner_output + logger.info(f"{self.log_prefix} 短时等待内收到匹配的Planner结果 (ID: {current_planner_request_id}): {planner_result.get('action')}") + elif isinstance(raw_planner_output, dict): + logger.warning(f"{self.log_prefix} 短时等待内收到不匹配的Planner结果。预期ID: {current_planner_request_id}, 收到ID: {raw_planner_output.get('request_id')}。丢弃此结果。") + # planner_result 保持为 None,将执行默认动作 + else: + logger.error(f"{self.log_prefix} Planner输出格式不正确(非字典或无request_id),丢弃。内容: {raw_planner_output}") + # planner_result 保持为 None + + except asyncio.TimeoutError: + logger.info(f"{self.log_prefix} 短时等待Planner决策结果超时 (ID: {current_planner_request_id})。主循环将先执行默认动作。") + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 主循环在短时等待Planner结果时被取消 (ID: {current_planner_request_id})。") + raise + except Exception as e: + logger.error(f"{self.log_prefix} 从Planner输出队列短时获取结果时出错 (ID: {current_planner_request_id}): {e}") + + + # 如果在短时等待后 planner_result 仍然是 None (无论是未发信号、超时还是获取出错) + # 则执行一个预定义的快速/默认动作 + if planner_result is None: + if signaled_planner_this_cycle: + logger.info(f"{self.log_prefix} Planner未在 {quick_wait_timeout}s 内响应,执行预定义快速动作 (no_reply)。") + else: + logger.info(f"{self.log_prefix} 未触发Planner或Planner繁忙/未激活,执行预定义快速动作 (no_reply)。") + + # 预定义快速/默认动作 + planner_result = {"action": "no_reply", + "reasoning": f"Planner未在{quick_wait_timeout}s内快速响应或未触发,默认不回复", + "emoji_query": "", + "llm_error": False # 这不是一个LLM错误,是流程控制 + } + # 注意:此时异步的Planner可能仍在后台运行。如果它稍后产生了结果, + # 在这个简化版本中,那个结果会被放入输出队列,并可能在下一个主循环周期 + # (如果那个周期也短时超时或未触发新规划)被错误地当作新结果取出。 + # 更完善的系统需要关联请求和响应,例如使用唯一的ID。 + + # --- 后续处理 planner_result --- + action = planner_result.get("action", "error") reasoning = planner_result.get("reasoning", "未提供理由") - # 更新循环信息 - self._current_cycle.set_action_info(action, reasoning, True) + emoji_query = planner_result.get("emoji_query", "") + llm_error_from_planner = planner_result.get("llm_error", False) + current_action_thinking_id = "" - # 处理LLM错误 - if planner_result.get("llm_error"): - logger.error(f"{self.log_prefix} LLM失败: {reasoning}") + if self._current_cycle: + action_is_reply = action in ["text_reply", "emoji_reply"] + self._current_cycle.set_action_info(action, reasoning, action_is_reply and not llm_error_from_planner) + + if llm_error_from_planner and action != "error": + logger.warning(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 返回LLM或解析错误: {reasoning}。强制执行 'no_reply'。") + action = "no_reply" + if self._current_cycle: self._current_cycle.action_type = "no_reply" + elif action == "error": + logger.error(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 决策返回错误状态: {reasoning}") + if self._current_cycle: self._current_cycle.action_taken = False return False, "" - # execute:执行 + action_str_log = {"text_reply": "回复", "emoji_reply": "回复表情", "no_reply": "不回复"}.get(action, "未知或错误动作") + logger.info(f"{self.log_prefix} (Planner ID: {planner_result.get('request_id', 'N/A')}) 麦麦最终决定'{action_str_log}', 原因'{reasoning}'") - # 在此处添加日志记录 - if action == "text_reply": - action_str = "回复" - elif action == "emoji_reply": - action_str = "回复表情" - else: - action_str = "不回复" - - logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'") - - return await self._handle_action( - action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time + action_executed, current_action_thinking_id = await self._handle_action( + action, reasoning, emoji_query, cycle_timers, planner_start_db_time ) - except PlannerError as e: - logger.error(f"{self.log_prefix} 规划错误: {e}") - # 更新循环信息 - self._current_cycle.set_action_info("error", str(e), False) + if self._current_cycle and current_action_thinking_id: + self._current_cycle.set_thinking_id(current_action_thinking_id) + + return action_executed, current_action_thinking_id + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} _think_plan_execute_loop 被取消。") + raise + except Exception as e: + logger.error(f"{self.log_prefix} _think_plan_execute_loop 发生未知严重错误: {e}") + logger.error(traceback.format_exc()) + if self._current_cycle: + self._current_cycle.set_action_info("error", f"主循环严重错误: {e}", False) return False, "" + async def _handle_action( self, action: str, reasoning: str, emoji_query: str, cycle_timers: dict, planner_start_db_time: float ) -> tuple[bool, str]: @@ -798,8 +906,83 @@ class HeartFChatting: logger.error(f"{self.log_prefix}子心流 思考失败: {e}") logger.error(traceback.format_exc()) return "[思考时出错]" + + # --- _async_planner_loop 方法 --- + async def _async_planner_loop(self): + """ + Planner的异步循环。 + 等待输入队列的信号,执行规划,并将结果放入输出队列。 + """ + self._planner_active = True + logger.info(f"{self.log_prefix} 异步Planner循环已启动。") - async def _planner(self, current_mind: str, cycle_timers: dict, is_re_planned: bool = False) -> Dict[str, Any]: + while self._planner_active: + original_request_id = None # 用于在异常时也能尝试返回ID + try: + request_timeout = getattr(getattr(self.sub_mind, 'llm_model', object()), 'request_timeout', 60) + planner_input_context = await asyncio.wait_for(self._planner_input_queue.get(), timeout=request_timeout + 60) + + if planner_input_context is None: + logger.info(f"{self.log_prefix} 异步Planner收到停止信号。") + self._planner_active = False + break + + current_mind = planner_input_context.get("current_mind") + # --- 获取请求ID --- + original_request_id = planner_input_context.get("request_id") + if not original_request_id: + logger.error(f"{self.log_prefix} Planner收到的请求上下文中缺少request_id!") + # 即使缺少ID,也尝试完成一次规划,但结果可能无法被主循环正确使用 + original_request_id = f"planner_fallback_id_{time.time()}" # 生成一个备用ID + + logger.debug(f"{self.log_prefix} 异步Planner收到规划请求 (ID: {original_request_id})。") + planner_internal_timers = {} + planner_result = await self._planner(current_mind, planner_internal_timers) + + # --- 在结果中加入请求ID --- + if isinstance(planner_result, dict): # 确保 planner_result 是字典 + planner_result["request_id"] = original_request_id + else: # 如果不是字典,则包装一下 + logger.warning(f"{self.log_prefix} Planner返回了非字典类型的结果: {planner_result}。将尝试包装。") + planner_result = { + "action": "error", + "reasoning": f"Planner返回了非字典结果 (ID: {original_request_id})", + "llm_error": True, + "request_id": original_request_id + } + + await self._planner_output_queue.put(planner_result) + self._planner_input_queue.task_done() + + except asyncio.TimeoutError: + logger.debug(f"{self.log_prefix} 异步Planner等待输入信号超时 (最近请求ID可能为: {original_request_id})。") + continue # 不向输出队列放任何东西,主循环的短时等待会超时 + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 异步Planner循环被取消 (最近请求ID可能为: {original_request_id})。") + self._planner_active = False + break + except Exception as e: + logger.error(f"{self.log_prefix} 异步Planner循环发生错误 (最近请求ID: {original_request_id}): {e}") + logger.error(traceback.format_exc()) + self._planner_active = False + # --- 修改:错误结果也附带请求ID --- + error_output = { + "action": "error", + "reasoning": f"Planner (req_id: {original_request_id}) 内部错误: {e}", + "llm_error": True, + "request_id": original_request_id # 附带原始请求ID + } + # --- 结束修改 --- + # 尝试放入错误结果,如果队列已满(不太可能)或有其他问题,也没关系,主循环最终会超时 + try: + await asyncio.wait_for(self._planner_output_queue.put(error_output), timeout=1.0) + except Exception as put_err: + logger.error(f"{self.log_prefix} Planner放入错误结果到输出队列时失败: {put_err}") + break + logger.info(f"{self.log_prefix} 异步Planner循环已停止。") + # --- 结束新增 _async_planner_loop 方法 --- + + async def _planner(self, current_mind: str, planner_cycle_timers: dict) -> Dict[str, Any]: """ 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。 重构为:让LLM返回结构化JSON文本,然后在代码中解析。 @@ -1114,9 +1297,49 @@ class HeartFChatting: # raise RuntimeError(f"发送回复失败: {e}") from e async def shutdown(self): - """优雅关闭HeartFChatting实例,取消活动循环任务""" + """优雅关闭HeartFChatting实例,取消活动循环任务和异步Planner任务""" logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") - self._shutting_down = True # <-- 在开始关闭时设置标志位 + self._shutting_down = True + self._loop_active = False + self._planner_active = False + + # --- 新增:停止异步Planner任务 --- + if self._planner_task and not self._planner_task.done(): + logger.info(f"{self.log_prefix} 正在取消异步Planner任务...") + try: + # 尝试向队列发送None作为停止信号,并设置短暂超时 + await asyncio.wait_for(self._planner_input_queue.put(None), timeout=0.5) + except asyncio.TimeoutError: + logger.warning(f"{self.log_prefix} 发送停止信号到Planner输入队列超时。") + except asyncio.QueueFull: # 如果队列是满的(理论上不应该在shutdown时) + logger.warning(f"{self.log_prefix} Planner输入队列已满,无法发送停止信号。") + + + if not self._planner_task.done(): # 再次检查任务是否已自行结束 + self._planner_task.cancel() + try: + await asyncio.wait_for(self._planner_task, timeout=1.0) + logger.info(f"{self.log_prefix} 异步Planner任务已取消。") + except (asyncio.CancelledError, asyncio.TimeoutError): + logger.info(f"{self.log_prefix} 异步Planner任务取消/超时完成。") + except Exception as e: + logger.error(f"{self.log_prefix} 等待异步Planner任务取消时出错: {e}") + + self._planner_task = None # 清理引用 + # 清空队列,以防万一 + while not self._planner_input_queue.empty(): + try: + self._planner_input_queue.get_nowait() + self._planner_input_queue.task_done() + except asyncio.QueueEmpty: + break + while not self._planner_output_queue.empty(): + try: + self._planner_output_queue.get_nowait() + # self._planner_output_queue.task_done() # 输出队列不需要task_done + except asyncio.QueueEmpty: + break + # --- 结束新增 --- # 取消循环任务 if self._loop_task and not self._loop_task.done():