回退 LHFC

pull/937/head
Bakadax 2025-05-16 20:38:30 +08:00
parent cef4013d43
commit 4fee112505
1 changed files with 48 additions and 273 deletions

View File

@ -5,7 +5,6 @@ import random # <--- 添加导入
import time import time
import re import re
import traceback import traceback
from asyncio import Queue
from collections import deque from collections import deque
from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine
@ -248,12 +247,6 @@ class HeartFChatting:
self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器
self._shutting_down: bool = False # <--- 新增:关闭标志位 self._shutting_down: bool = False # <--- 新增:关闭标志位
self._lian_xu_deng_dai_shi_jian: float = 0.0 # <--- 新增:累计等待时间 self._lian_xu_deng_dai_shi_jian: float = 0.0 # <--- 新增:累计等待时间
self._planner_task: Optional[asyncio.Task] = None # 用于跟踪异步planner任务
self._planner_input_queue: Queue = Queue(maxsize=1) # 主循环 -> Planner 的信号队列
self._planner_output_queue: Queue = Queue(maxsize=1) # Planner -> 主循环 的结果队列
self._planner_active: bool = False # 标记Planner是否应该运行
self._last_planner_signal_time: float = 0.0 # 记录上次发送规划信号的时间
self._planner_request_id_counter: int = 0
async def _initialize(self) -> bool: async def _initialize(self) -> bool:
""" """
@ -290,23 +283,11 @@ class HeartFChatting:
async def start(self): async def start(self):
""" """
启动 HeartFChatting 的主循环和异步Planner循环 启动 HeartFChatting 的主循环
注意调用此方法前必须确保已经成功初始化 注意调用此方法前必须确保已经成功初始化
""" """
logger.info(f"{self.log_prefix} 开始认真水群(HFC)...") logger.info(f"{self.log_prefix} 开始认真水群(HFC)...")
if not self._initialized: # 确保已初始化 await self._start_loop_if_needed()
if not await self._initialize():
logger.error(f"{self.log_prefix} HFC 初始化失败,无法启动。")
return
await self._start_loop_if_needed() # 启动主循环
# --- 启动异步Planner循环 ---
if self._planner_task is None or self._planner_task.done():
self._planner_active = True # 先设置active标志
self._planner_task = asyncio.create_task(self._async_planner_loop())
self._planner_task.add_done_callback(self._handle_planner_loop_completion)
# logger.info(f"{self.log_prefix} 异步Planner任务已启动。") # 这行日志已在 _async_planner_loop 中存在
async def _start_loop_if_needed(self): async def _start_loop_if_needed(self):
"""检查是否需要启动主循环,如果未激活则启动。""" """检查是否需要启动主循环,如果未激活则启动。"""
@ -353,24 +334,6 @@ class HeartFChatting:
logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
self._processing_lock.release() self._processing_lock.release()
# --- 新增 _handle_planner_loop_completion 方法 ---
def _handle_planner_loop_completion(self, task: asyncio.Task):
"""当 _async_planner_loop 任务完成时执行的回调。"""
try:
exception = task.exception()
if exception:
logger.error(f"{self.log_prefix} 异步Planner任务异常结束: {exception}")
logger.error(traceback.format_exc())
# else: # 任务正常结束的日志已在 _async_planner_loop 中处理
# logger.info(f"{self.log_prefix} 异步Planner任务正常结束。")
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 异步Planner任务被取消。")
finally:
self._planner_active = False
if self._planner_task is task:
self._planner_task = None
# --- 结束新增 _handle_planner_loop_completion 方法 ---
async def _hfc_loop(self): async def _hfc_loop(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。""" """主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try: try:
@ -481,142 +444,69 @@ class HeartFChatting:
return False return False
async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]: async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]:
""" """执行规划阶段"""
执行思考异步触发Planner尝试短时等待Planner结果
若无则执行默认动作同时允许Planner后台完成
"""
_thinking_id_for_cycle = ""
try: try:
# think:思考
current_mind = await self._get_submind_thinking(cycle_timers) current_mind = await self._get_submind_thinking(cycle_timers)
# 记录子思维思考内容
if self._current_cycle: if self._current_cycle:
self._current_cycle.set_response_info(sub_mind_thinking=current_mind) self._current_cycle.set_response_info(sub_mind_thinking=current_mind)
planner_result = None # plan:决策
signaled_planner_this_cycle = False # 标记本周期是否已成功触发planner with Timer("决策", cycle_timers):
planner_result = await self._planner(current_mind, cycle_timers)
self._planner_request_id_counter += 1 # 效果不太好还没处理replan导致观察时间点改变的问题
current_planner_request_id = f"planner_req_{self.stream_id}_{self._cycle_counter}_{self._planner_request_id_counter}"
min_planner_interval = 1.0 # action = planner_result.get("action", "error")
can_signal_planner = (time.time() - self._last_planner_signal_time) >= min_planner_interval # reasoning = planner_result.get("reasoning", "未提供理由")
if self._planner_active and self._planner_input_queue.empty() and can_signal_planner: # self._current_cycle.set_action_info(action, reasoning, False)
logger.debug(f"{self.log_prefix} 向异步Planner发送规划请求 (ID: {current_planner_request_id})...")
# --- 修改在上下文中加入请求ID ---
planner_input_context = {
"current_mind": current_mind,
"request_id": current_planner_request_id # <<< 新增
}
try:
self._planner_input_queue.put_nowait(planner_input_context)
self._last_planner_signal_time = time.time()
signaled_planner_this_cycle = True
except asyncio.QueueFull:
logger.warning(f"{self.log_prefix} Planner输入队列已满本次规划跳过。")
# 不再直接设定 planner_result让后续逻辑决定默认行为
# 其他条件导致未发送信号的日志 (保持或根据需要调整)
elif not self._planner_active:
logger.warning(f"{self.log_prefix} Planner未激活无法发送规划请求。")
elif not can_signal_planner:
logger.debug(f"{self.log_prefix} Planner请求过于频繁本次跳过。")
else: # Planner激活但输入队列非空
logger.warning(f"{self.log_prefix} Planner输入队列仍有任务本次跳过。")
# 在获取规划结果后检查新消息
# --- 尝试在短时间内获取Planner的决策结果 --- # if await self._check_new_messages(planner_start_db_time):
quick_wait_timeout = 3.0 # if random.random() < 0.2:
# logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...")
# # 重新规划
# with Timer("重新决策", cycle_timers):
# self._current_cycle.replanned = True
# planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True)
# logger.info(f"{self.log_prefix} 重新规划完成.")
if signaled_planner_this_cycle: # 解析规划结果
logger.debug(f"{self.log_prefix} 短时等待 ({quick_wait_timeout}s) Planner结果 (ID: {current_planner_request_id})...")
try:
# --- 从队列获取的结果现在应该也包含 request_id ---
raw_planner_output = await asyncio.wait_for(self._planner_output_queue.get(), timeout=quick_wait_timeout)
self._planner_output_queue.task_done() # 提到前面,获取后就标记完成
# --- 校验 request_id ---
if isinstance(raw_planner_output, dict) and raw_planner_output.get("request_id") == current_planner_request_id:
planner_result = raw_planner_output
logger.info(f"{self.log_prefix} 短时等待内收到匹配的Planner结果 (ID: {current_planner_request_id}): {planner_result.get('action')}")
elif isinstance(raw_planner_output, dict):
logger.warning(f"{self.log_prefix} 短时等待内收到不匹配的Planner结果。预期ID: {current_planner_request_id}, 收到ID: {raw_planner_output.get('request_id')}。丢弃此结果。")
# planner_result 保持为 None将执行默认动作
else:
logger.error(f"{self.log_prefix} Planner输出格式不正确非字典或无request_id丢弃。内容: {raw_planner_output}")
# planner_result 保持为 None
except asyncio.TimeoutError:
logger.info(f"{self.log_prefix} 短时等待Planner决策结果超时 (ID: {current_planner_request_id})。主循环将先执行默认动作。")
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 主循环在短时等待Planner结果时被取消 (ID: {current_planner_request_id})。")
raise
except Exception as e:
logger.error(f"{self.log_prefix} 从Planner输出队列短时获取结果时出错 (ID: {current_planner_request_id}): {e}")
# 如果在短时等待后 planner_result 仍然是 None (无论是未发信号、超时还是获取出错)
# 则执行一个预定义的快速/默认动作
if planner_result is None:
if signaled_planner_this_cycle:
logger.info(f"{self.log_prefix} Planner未在 {quick_wait_timeout}s 内响应,执行预定义快速动作 (no_reply)。")
else:
logger.info(f"{self.log_prefix} 未触发Planner或Planner繁忙/未激活,执行预定义快速动作 (no_reply)。")
# 预定义快速/默认动作
planner_result = {"action": "no_reply",
"reasoning": f"Planner未在{quick_wait_timeout}s内快速响应或未触发默认不回复",
"emoji_query": "",
"llm_error": False # 这不是一个LLM错误是流程控制
}
# 注意此时异步的Planner可能仍在后台运行。如果它稍后产生了结果
# 在这个简化版本中,那个结果会被放入输出队列,并可能在下一个主循环周期
# (如果那个周期也短时超时或未触发新规划)被错误地当作新结果取出。
# 更完善的系统需要关联请求和响应例如使用唯一的ID。
# --- 后续处理 planner_result ---
action = planner_result.get("action", "error") action = planner_result.get("action", "error")
reasoning = planner_result.get("reasoning", "未提供理由") reasoning = planner_result.get("reasoning", "未提供理由")
emoji_query = planner_result.get("emoji_query", "") # 更新循环信息
llm_error_from_planner = planner_result.get("llm_error", False) self._current_cycle.set_action_info(action, reasoning, True)
current_action_thinking_id = ""
if self._current_cycle: # 处理LLM错误
action_is_reply = action in ["text_reply", "emoji_reply"] if planner_result.get("llm_error"):
self._current_cycle.set_action_info(action, reasoning, action_is_reply and not llm_error_from_planner) logger.error(f"{self.log_prefix} LLM失败: {reasoning}")
if llm_error_from_planner and action != "error":
logger.warning(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 返回LLM或解析错误: {reasoning}。强制执行 'no_reply'")
action = "no_reply"
if self._current_cycle:
self._current_cycle.action_type = "no_reply"
elif action == "error":
logger.error(f"{self.log_prefix} Planner (ID: {planner_result.get('request_id', 'N/A')}) 决策返回错误状态: {reasoning}")
if self._current_cycle:
self._current_cycle.action_taken = False
return False, "" return False, ""
action_str_log = {"text_reply": "回复", "emoji_reply": "回复表情", "no_reply": "不回复"}.get(action, "未知或错误动作") # execute:执行
logger.info(f"{self.log_prefix} (Planner ID: {planner_result.get('request_id', 'N/A')}) 麦麦最终决定'{action_str_log}', 原因'{reasoning}'")
action_executed, current_action_thinking_id = await self._handle_action( # 在此处添加日志记录
action, reasoning, emoji_query, cycle_timers, planner_start_db_time if action == "text_reply":
action_str = "回复"
elif action == "emoji_reply":
action_str = "回复表情"
else:
action_str = "不回复"
logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'")
return await self._handle_action(
action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time
) )
if self._current_cycle and current_action_thinking_id: except PlannerError as e:
self._current_cycle.set_thinking_id(current_action_thinking_id) logger.error(f"{self.log_prefix} 规划错误: {e}")
# 更新循环信息
return action_executed, current_action_thinking_id self._current_cycle.set_action_info("error", str(e), False)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} _think_plan_execute_loop 被取消。")
raise
except Exception as e:
logger.error(f"{self.log_prefix} _think_plan_execute_loop 发生未知严重错误: {e}")
logger.error(traceback.format_exc())
if self._current_cycle:
self._current_cycle.set_action_info("error", f"主循环严重错误: {e}", False)
return False, "" return False, ""
async def _handle_action( async def _handle_action(
self, action: str, reasoning: str, emoji_query: str, cycle_timers: dict, planner_start_db_time: float self, action: str, reasoning: str, emoji_query: str, cycle_timers: dict, planner_start_db_time: float
) -> tuple[bool, str]: ) -> tuple[bool, str]:
@ -909,82 +799,7 @@ class HeartFChatting:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return "[思考时出错]" return "[思考时出错]"
# --- _async_planner_loop 方法 --- async def _planner(self, current_mind: str, cycle_timers: dict, is_re_planned: bool = False) -> Dict[str, Any]:
async def _async_planner_loop(self):
"""
Planner的异步循环
等待输入队列的信号执行规划并将结果放入输出队列
"""
self._planner_active = True
logger.info(f"{self.log_prefix} 异步Planner循环已启动。")
while self._planner_active:
original_request_id = None # 用于在异常时也能尝试返回ID
try:
request_timeout = getattr(getattr(self.sub_mind, 'llm_model', object()), 'request_timeout', 60)
planner_input_context = await asyncio.wait_for(self._planner_input_queue.get(), timeout=request_timeout + 60)
if planner_input_context is None:
logger.info(f"{self.log_prefix} 异步Planner收到停止信号。")
self._planner_active = False
break
current_mind = planner_input_context.get("current_mind")
# --- 获取请求ID ---
original_request_id = planner_input_context.get("request_id")
if not original_request_id:
logger.error(f"{self.log_prefix} Planner收到的请求上下文中缺少request_id")
# 即使缺少ID也尝试完成一次规划但结果可能无法被主循环正确使用
original_request_id = f"planner_fallback_id_{time.time()}" # 生成一个备用ID
logger.debug(f"{self.log_prefix} 异步Planner收到规划请求 (ID: {original_request_id})。")
planner_internal_timers = {}
planner_result = await self._planner(current_mind, planner_internal_timers)
# --- 在结果中加入请求ID ---
if isinstance(planner_result, dict): # 确保 planner_result 是字典
planner_result["request_id"] = original_request_id
else: # 如果不是字典,则包装一下
logger.warning(f"{self.log_prefix} Planner返回了非字典类型的结果: {planner_result}。将尝试包装。")
planner_result = {
"action": "error",
"reasoning": f"Planner返回了非字典结果 (ID: {original_request_id})",
"llm_error": True,
"request_id": original_request_id
}
await self._planner_output_queue.put(planner_result)
self._planner_input_queue.task_done()
except asyncio.TimeoutError:
logger.debug(f"{self.log_prefix} 异步Planner等待输入信号超时 (最近请求ID可能为: {original_request_id})。")
continue # 不向输出队列放任何东西,主循环的短时等待会超时
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 异步Planner循环被取消 (最近请求ID可能为: {original_request_id})。")
self._planner_active = False
break
except Exception as e:
logger.error(f"{self.log_prefix} 异步Planner循环发生错误 (最近请求ID: {original_request_id}): {e}")
logger.error(traceback.format_exc())
self._planner_active = False
# --- 修改错误结果也附带请求ID ---
error_output = {
"action": "error",
"reasoning": f"Planner (req_id: {original_request_id}) 内部错误: {e}",
"llm_error": True,
"request_id": original_request_id # 附带原始请求ID
}
# --- 结束修改 ---
# 尝试放入错误结果,如果队列已满(不太可能)或有其他问题,也没关系,主循环最终会超时
try:
await asyncio.wait_for(self._planner_output_queue.put(error_output), timeout=1.0)
except Exception as put_err:
logger.error(f"{self.log_prefix} Planner放入错误结果到输出队列时失败: {put_err}")
break
logger.info(f"{self.log_prefix} 异步Planner循环已停止。")
# --- 结束新增 _async_planner_loop 方法 ---
async def _planner(self, current_mind: str, planner_cycle_timers: dict) -> Dict[str, Any]:
""" """
规划器 (Planner): 使用LLM根据上下文决定是否和如何回复 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复
重构为让LLM返回结构化JSON文本然后在代码中解析 重构为让LLM返回结构化JSON文本然后在代码中解析
@ -1299,49 +1114,9 @@ class HeartFChatting:
# raise RuntimeError(f"发送回复失败: {e}") from e # raise RuntimeError(f"发送回复失败: {e}") from e
async def shutdown(self): async def shutdown(self):
"""优雅关闭HeartFChatting实例取消活动循环任务和异步Planner任务""" """优雅关闭HeartFChatting实例取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
self._shutting_down = True self._shutting_down = True # <-- 在开始关闭时设置标志位
self._loop_active = False
self._planner_active = False
# --- 新增停止异步Planner任务 ---
if self._planner_task and not self._planner_task.done():
logger.info(f"{self.log_prefix} 正在取消异步Planner任务...")
try:
# 尝试向队列发送None作为停止信号并设置短暂超时
await asyncio.wait_for(self._planner_input_queue.put(None), timeout=0.5)
except asyncio.TimeoutError:
logger.warning(f"{self.log_prefix} 发送停止信号到Planner输入队列超时。")
except asyncio.QueueFull: # 如果队列是满的理论上不应该在shutdown时
logger.warning(f"{self.log_prefix} Planner输入队列已满无法发送停止信号。")
if not self._planner_task.done(): # 再次检查任务是否已自行结束
self._planner_task.cancel()
try:
await asyncio.wait_for(self._planner_task, timeout=1.0)
logger.info(f"{self.log_prefix} 异步Planner任务已取消。")
except (asyncio.CancelledError, asyncio.TimeoutError):
logger.info(f"{self.log_prefix} 异步Planner任务取消/超时完成。")
except Exception as e:
logger.error(f"{self.log_prefix} 等待异步Planner任务取消时出错: {e}")
self._planner_task = None # 清理引用
# 清空队列,以防万一
while not self._planner_input_queue.empty():
try:
self._planner_input_queue.get_nowait()
self._planner_input_queue.task_done()
except asyncio.QueueEmpty:
break
while not self._planner_output_queue.empty():
try:
self._planner_output_queue.get_nowait()
# self._planner_output_queue.task_done() # 输出队列不需要task_done
except asyncio.QueueEmpty:
break
# --- 结束新增 ---
# 取消循环任务 # 取消循环任务
if self._loop_task and not self._loop_task.done(): if self._loop_task and not self._loop_task.done():