diff --git a/src/experimental/Legacy_HFC/heartFC_chat.py b/src/experimental/Legacy_HFC/heartFC_chat.py index 8d608c1e..8f1d4f00 100644 --- a/src/experimental/Legacy_HFC/heartFC_chat.py +++ b/src/experimental/Legacy_HFC/heartFC_chat.py @@ -5,7 +5,6 @@ import random # <--- 添加导入 import time import re import traceback -from asyncio import Queue from collections import deque from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine @@ -248,12 +247,6 @@ class HeartFChatting: self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 self._shutting_down: bool = False # <--- 新增:关闭标志位 self._lian_xu_deng_dai_shi_jian: float = 0.0 # <--- 新增:累计等待时间 - self._planner_task: Optional[asyncio.Task] = None # 用于跟踪异步planner任务 - self._planner_input_queue: Queue = Queue(maxsize=1) # 主循环 -> Planner 的信号队列 - self._planner_output_queue: Queue = Queue(maxsize=1) # Planner -> 主循环 的结果队列 - self._planner_active: bool = False # 标记Planner是否应该运行 - self._last_planner_signal_time: float = 0.0 # 记录上次发送规划信号的时间 - self._planner_request_id_counter: int = 0 async def _initialize(self) -> bool: """ @@ -290,23 +283,11 @@ class HeartFChatting: async def start(self): """ - 启动 HeartFChatting 的主循环和异步Planner循环。 + 启动 HeartFChatting 的主循环。 注意:调用此方法前必须确保已经成功初始化。 """ logger.info(f"{self.log_prefix} 开始认真水群(HFC)...") - if not self._initialized: # 确保已初始化 - if not await self._initialize(): - logger.error(f"{self.log_prefix} HFC 初始化失败,无法启动。") - return - - await self._start_loop_if_needed() # 启动主循环 - - # --- 启动异步Planner循环 --- - if self._planner_task is None or self._planner_task.done(): - self._planner_active = True # 先设置active标志 - self._planner_task = asyncio.create_task(self._async_planner_loop()) - self._planner_task.add_done_callback(self._handle_planner_loop_completion) - # logger.info(f"{self.log_prefix} 异步Planner任务已启动。") # 这行日志已在 _async_planner_loop 中存在 + await self._start_loop_if_needed() async def _start_loop_if_needed(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -353,24 +334,6 @@ class HeartFChatting: logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") self._processing_lock.release() - # --- 新增 _handle_planner_loop_completion 方法 --- - def _handle_planner_loop_completion(self, task: asyncio.Task): - """当 _async_planner_loop 任务完成时执行的回调。""" - try: - exception = task.exception() - if exception: - logger.error(f"{self.log_prefix} 异步Planner任务异常结束: {exception}") - logger.error(traceback.format_exc()) - # else: # 任务正常结束的日志已在 _async_planner_loop 中处理 - # logger.info(f"{self.log_prefix} 异步Planner任务正常结束。") - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 异步Planner任务被取消。") - finally: - self._planner_active = False - if self._planner_task is task: - self._planner_task = None - # --- 结束新增 _handle_planner_loop_completion 方法 --- - async def _hfc_loop(self): """主循环,持续进行计划并可能回复消息,直到被外部取消。""" try: @@ -481,142 +444,69 @@ class HeartFChatting: return False async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]: - """ - 执行思考,异步触发Planner,尝试短时等待Planner结果, - 若无则执行默认动作,同时允许Planner后台完成。 - """ - _thinking_id_for_cycle = "" + """执行规划阶段""" try: + # think:思考 current_mind = await self._get_submind_thinking(cycle_timers) + # 记录子思维思考内容 if self._current_cycle: self._current_cycle.set_response_info(sub_mind_thinking=current_mind) - planner_result = None - signaled_planner_this_cycle = False # 标记本周期是否已成功触发planner + # plan:决策 + with Timer("决策", cycle_timers): + planner_result = await self._planner(current_mind, cycle_timers) - self._planner_request_id_counter += 1 - current_planner_request_id = f"planner_req_{self.stream_id}_{self._cycle_counter}_{self._planner_request_id_counter}" + # 效果不太好,还没处理replan导致观察时间点改变的问题 - min_planner_interval = 1.0 - can_signal_planner = (time.time() - self._last_planner_signal_time) >= min_planner_interval + # action = planner_result.get("action", "error") + # reasoning = planner_result.get("reasoning", "未提供理由") - if self._planner_active and self._planner_input_queue.empty() and can_signal_planner: - logger.debug(f"{self.log_prefix} 向异步Planner发送规划请求 (ID: {current_planner_request_id})...") - # --- 修改:在上下文中加入请求ID --- - planner_input_context = { - "current_mind": current_mind, - "request_id": current_planner_request_id # <<< 新增 - } - try: - self._planner_input_queue.put_nowait(planner_input_context) - self._last_planner_signal_time = time.time() - signaled_planner_this_cycle = True - except asyncio.QueueFull: - logger.warning(f"{self.log_prefix} Planner输入队列已满,本次规划跳过。") - # 不再直接设定 planner_result,让后续逻辑决定默认行为 - # 其他条件导致未发送信号的日志 (保持或根据需要调整) - elif not self._planner_active: - logger.warning(f"{self.log_prefix} Planner未激活,无法发送规划请求。") - elif not can_signal_planner: - logger.debug(f"{self.log_prefix} Planner请求过于频繁,本次跳过。") - else: # Planner激活但输入队列非空 - logger.warning(f"{self.log_prefix} Planner输入队列仍有任务,本次跳过。") + # self._current_cycle.set_action_info(action, reasoning, False) + # 在获取规划结果后检查新消息 - # --- 尝试在短时间内获取Planner的决策结果 --- - quick_wait_timeout = 3.0 + # if await self._check_new_messages(planner_start_db_time): + # if random.random() < 0.2: + # logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...") + # # 重新规划 + # with Timer("重新决策", cycle_timers): + # self._current_cycle.replanned = True + # planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True) + # logger.info(f"{self.log_prefix} 重新规划完成.") - if signaled_planner_this_cycle: - logger.debug(f"{self.log_prefix} 短时等待 ({quick_wait_timeout}s) Planner结果 (ID: {current_planner_request_id})...") - try: - # --- 从队列获取的结果现在应该也包含 request_id --- - raw_planner_output = await asyncio.wait_for(self._planner_output_queue.get(), timeout=quick_wait_timeout) - self._planner_output_queue.task_done() # 提到前面,获取后就标记完成 - - # --- 校验 request_id --- - if isinstance(raw_planner_output, dict) and raw_planner_output.get("request_id") == current_planner_request_id: - planner_result = raw_planner_output - logger.info(f"{self.log_prefix} 短时等待内收到匹配的Planner结果 (ID: {current_planner_request_id}): {planner_result.get('action')}") - elif isinstance(raw_planner_output, dict): - logger.warning(f"{self.log_prefix} 短时等待内收到不匹配的Planner结果。预期ID: {current_planner_request_id}, 收到ID: {raw_planner_output.get('request_id')}。丢弃此结果。") - # planner_result 保持为 None,将执行默认动作 - else: - logger.error(f"{self.log_prefix} Planner输出格式不正确(非字典或无request_id),丢弃。内容: {raw_planner_output}") - # planner_result 保持为 None - - except asyncio.TimeoutError: - logger.info(f"{self.log_prefix} 短时等待Planner决策结果超时 (ID: {current_planner_request_id})。主循环将先执行默认动作。") - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 主循环在短时等待Planner结果时被取消 (ID: {current_planner_request_id})。") - raise - except Exception as e: - logger.error(f"{self.log_prefix} 从Planner输出队列短时获取结果时出错 (ID: {current_planner_request_id}): {e}") - - - # 如果在短时等待后 planner_result 仍然是 None (无论是未发信号、超时还是获取出错) - # 则执行一个预定义的快速/默认动作 - if planner_result is None: - if signaled_planner_this_cycle: - logger.info(f"{self.log_prefix} Planner未在 {quick_wait_timeout}s 内响应,执行预定义快速动作 (no_reply)。") - else: - logger.info(f"{self.log_prefix} 未触发Planner或Planner繁忙/未激活,执行预定义快速动作 (no_reply)。") - - # 预定义快速/默认动作 - planner_result = {"action": "no_reply", - "reasoning": f"Planner未在{quick_wait_timeout}s内快速响应或未触发,默认不回复", - "emoji_query": "", - "llm_error": False # 这不是一个LLM错误,是流程控制 - } - # 注意:此时异步的Planner可能仍在后台运行。如果它稍后产生了结果, - # 在这个简化版本中,那个结果会被放入输出队列,并可能在下一个主循环周期 - # (如果那个周期也短时超时或未触发新规划)被错误地当作新结果取出。 - # 更完善的系统需要关联请求和响应,例如使用唯一的ID。 - - # --- 后续处理 planner_result --- - action = planner_result.get("action", "error") + # 解析规划结果 + action = planner_result.get("action", "error") reasoning = planner_result.get("reasoning", "未提供理由") - emoji_query = planner_result.get("emoji_query", "") - llm_error_from_planner = planner_result.get("llm_error", False) - current_action_thinking_id = "" + # 更新循环信息 + self._current_cycle.set_action_info(action, reasoning, True) - if self._current_cycle: - action_is_reply = action in ["text_reply", "emoji_reply"] - self._current_cycle.set_action_info(action, reasoning, action_is_reply and not llm_error_from_planner) - - if llm_error_from_planner and action != "error": - logger.warning(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 返回LLM或解析错误: {reasoning}。强制执行 'no_reply'。") - action = "no_reply" - if self._current_cycle: - self._current_cycle.action_type = "no_reply" - elif action == "error": - logger.error(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 决策返回错误状态: {reasoning}") - if self._current_cycle: - self._current_cycle.action_taken = False + # 处理LLM错误 + if planner_result.get("llm_error"): + logger.error(f"{self.log_prefix} LLM失败: {reasoning}") return False, "" - action_str_log = {"text_reply": "回复", "emoji_reply": "回复表情", "no_reply": "不回复"}.get(action, "未知或错误动作") - logger.info(f"{self.log_prefix} (Planner ID: {planner_result.get('request_id', 'N/A')}) 麦麦最终决定'{action_str_log}', 原因'{reasoning}'") + # execute:执行 - action_executed, current_action_thinking_id = await self._handle_action( - action, reasoning, emoji_query, cycle_timers, planner_start_db_time + # 在此处添加日志记录 + if action == "text_reply": + action_str = "回复" + elif action == "emoji_reply": + action_str = "回复表情" + else: + action_str = "不回复" + + logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'") + + return await self._handle_action( + action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time ) - if self._current_cycle and current_action_thinking_id: - self._current_cycle.set_thinking_id(current_action_thinking_id) - - return action_executed, current_action_thinking_id - - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} _think_plan_execute_loop 被取消。") - raise - except Exception as e: - logger.error(f"{self.log_prefix} _think_plan_execute_loop 发生未知严重错误: {e}") - logger.error(traceback.format_exc()) - if self._current_cycle: - self._current_cycle.set_action_info("error", f"主循环严重错误: {e}", False) + except PlannerError as e: + logger.error(f"{self.log_prefix} 规划错误: {e}") + # 更新循环信息 + self._current_cycle.set_action_info("error", str(e), False) return False, "" - async def _handle_action( self, action: str, reasoning: str, emoji_query: str, cycle_timers: dict, planner_start_db_time: float ) -> tuple[bool, str]: @@ -908,83 +798,8 @@ class HeartFChatting: logger.error(f"{self.log_prefix}子心流 思考失败: {e}") logger.error(traceback.format_exc()) return "[思考时出错]" - - # --- _async_planner_loop 方法 --- - async def _async_planner_loop(self): - """ - Planner的异步循环。 - 等待输入队列的信号,执行规划,并将结果放入输出队列。 - """ - self._planner_active = True - logger.info(f"{self.log_prefix} 异步Planner循环已启动。") - while self._planner_active: - original_request_id = None # 用于在异常时也能尝试返回ID - try: - request_timeout = getattr(getattr(self.sub_mind, 'llm_model', object()), 'request_timeout', 60) - planner_input_context = await asyncio.wait_for(self._planner_input_queue.get(), timeout=request_timeout + 60) - - if planner_input_context is None: - logger.info(f"{self.log_prefix} 异步Planner收到停止信号。") - self._planner_active = False - break - - current_mind = planner_input_context.get("current_mind") - # --- 获取请求ID --- - original_request_id = planner_input_context.get("request_id") - if not original_request_id: - logger.error(f"{self.log_prefix} Planner收到的请求上下文中缺少request_id!") - # 即使缺少ID,也尝试完成一次规划,但结果可能无法被主循环正确使用 - original_request_id = f"planner_fallback_id_{time.time()}" # 生成一个备用ID - - logger.debug(f"{self.log_prefix} 异步Planner收到规划请求 (ID: {original_request_id})。") - planner_internal_timers = {} - planner_result = await self._planner(current_mind, planner_internal_timers) - - # --- 在结果中加入请求ID --- - if isinstance(planner_result, dict): # 确保 planner_result 是字典 - planner_result["request_id"] = original_request_id - else: # 如果不是字典,则包装一下 - logger.warning(f"{self.log_prefix} Planner返回了非字典类型的结果: {planner_result}。将尝试包装。") - planner_result = { - "action": "error", - "reasoning": f"Planner返回了非字典结果 (ID: {original_request_id})", - "llm_error": True, - "request_id": original_request_id - } - - await self._planner_output_queue.put(planner_result) - self._planner_input_queue.task_done() - - except asyncio.TimeoutError: - logger.debug(f"{self.log_prefix} 异步Planner等待输入信号超时 (最近请求ID可能为: {original_request_id})。") - continue # 不向输出队列放任何东西,主循环的短时等待会超时 - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 异步Planner循环被取消 (最近请求ID可能为: {original_request_id})。") - self._planner_active = False - break - except Exception as e: - logger.error(f"{self.log_prefix} 异步Planner循环发生错误 (最近请求ID: {original_request_id}): {e}") - logger.error(traceback.format_exc()) - self._planner_active = False - # --- 修改:错误结果也附带请求ID --- - error_output = { - "action": "error", - "reasoning": f"Planner (req_id: {original_request_id}) 内部错误: {e}", - "llm_error": True, - "request_id": original_request_id # 附带原始请求ID - } - # --- 结束修改 --- - # 尝试放入错误结果,如果队列已满(不太可能)或有其他问题,也没关系,主循环最终会超时 - try: - await asyncio.wait_for(self._planner_output_queue.put(error_output), timeout=1.0) - except Exception as put_err: - logger.error(f"{self.log_prefix} Planner放入错误结果到输出队列时失败: {put_err}") - break - logger.info(f"{self.log_prefix} 异步Planner循环已停止。") - # --- 结束新增 _async_planner_loop 方法 --- - - async def _planner(self, current_mind: str, planner_cycle_timers: dict) -> Dict[str, Any]: + async def _planner(self, current_mind: str, cycle_timers: dict, is_re_planned: bool = False) -> Dict[str, Any]: """ 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。 重构为:让LLM返回结构化JSON文本,然后在代码中解析。 @@ -1299,49 +1114,9 @@ class HeartFChatting: # raise RuntimeError(f"发送回复失败: {e}") from e async def shutdown(self): - """优雅关闭HeartFChatting实例,取消活动循环任务和异步Planner任务""" + """优雅关闭HeartFChatting实例,取消活动循环任务""" logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") - self._shutting_down = True - self._loop_active = False - self._planner_active = False - - # --- 新增:停止异步Planner任务 --- - if self._planner_task and not self._planner_task.done(): - logger.info(f"{self.log_prefix} 正在取消异步Planner任务...") - try: - # 尝试向队列发送None作为停止信号,并设置短暂超时 - await asyncio.wait_for(self._planner_input_queue.put(None), timeout=0.5) - except asyncio.TimeoutError: - logger.warning(f"{self.log_prefix} 发送停止信号到Planner输入队列超时。") - except asyncio.QueueFull: # 如果队列是满的(理论上不应该在shutdown时) - logger.warning(f"{self.log_prefix} Planner输入队列已满,无法发送停止信号。") - - - if not self._planner_task.done(): # 再次检查任务是否已自行结束 - self._planner_task.cancel() - try: - await asyncio.wait_for(self._planner_task, timeout=1.0) - logger.info(f"{self.log_prefix} 异步Planner任务已取消。") - except (asyncio.CancelledError, asyncio.TimeoutError): - logger.info(f"{self.log_prefix} 异步Planner任务取消/超时完成。") - except Exception as e: - logger.error(f"{self.log_prefix} 等待异步Planner任务取消时出错: {e}") - - self._planner_task = None # 清理引用 - # 清空队列,以防万一 - while not self._planner_input_queue.empty(): - try: - self._planner_input_queue.get_nowait() - self._planner_input_queue.task_done() - except asyncio.QueueEmpty: - break - while not self._planner_output_queue.empty(): - try: - self._planner_output_queue.get_nowait() - # self._planner_output_queue.task_done() # 输出队列不需要task_done - except asyncio.QueueEmpty: - break - # --- 结束新增 --- + self._shutting_down = True # <-- 在开始关闭时设置标志位 # 取消循环任务 if self._loop_task and not self._loop_task.done():