From f80eb1eacae1f6803bcae8f78bccd560479e2931 Mon Sep 17 00:00:00 2001 From: 2829798842 <2829798842@qq.com> Date: Sun, 27 Apr 2025 13:56:20 +0800 Subject: [PATCH] Update subheartflow_manager.py --- src/heart_flow/subheartflow_manager.py | 72 ++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 6d994d76..285fea7e 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -22,6 +22,11 @@ from src.config.config import global_config from src.individuality.individuality import Individuality import traceback +# --- 新增导入 --- +from src.do_tool.tool_can_use.schedule_task import TaskScheduler +from datetime import datetime, timezone +# --- 结束新增导入 --- + # 初始化日志记录器 @@ -260,6 +265,73 @@ class SubHeartflowManager: f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count} 个非 ABSENT 子心流的状态更改为 ABSENT。" ) + async def check_upcoming_tasks_and_focus(self, upcoming_threshold_seconds: int = 60): + """ + 检查计划任务,并将有任务即将到期的子心流状态设置为 FOCUSED。 + + Args: + upcoming_threshold_seconds: 定义"即将到来"的时间阈值(秒)。默认为 60 秒(1 分钟)。 + """ + log_prefix = "[任务检查聚焦]" + try: + scheduler = TaskScheduler.get_instance() + # 获取当前计划任务的快照(包含时间戳、描述和回调信息) + # 注意:直接访问 scheduled_tasks 列表,需要确保 TaskScheduler 内部对列表的操作是线程/协程安全的 + # 如果不安全,TaskScheduler 应提供一个获取安全副本的方法 + tasks_snapshot = list(scheduler.scheduled_tasks) # 创建副本以安全迭代 + + if not tasks_snapshot: + # logger.debug(f"{log_prefix} 当前无计划任务。") + return + + now_ts = datetime.now(timezone.utc).timestamp() + subflows_to_focus = set() # 使用集合避免重复处理同一个 subflow_id + + # 1. 识别哪些子心流需要被聚焦 + for timestamp, task_description, callback_info in tasks_snapshot: + # 检查任务是否在未来阈值时间内 + if now_ts < timestamp <= now_ts + upcoming_threshold_seconds: + # 尝试从 callback_info 中获取 subheartflow_id + # !!! 关键假设:添加任务时 callback_info 中存储了 'subheartflow_id' !!! + subheartflow_id = callback_info.get('subheartflow_id') + + if subheartflow_id: + logger.debug(f"{log_prefix} 发现子心流 {subheartflow_id} 的任务 '{task_description}' 即将执行 (剩余 {timestamp - now_ts:.1f} 秒)") + subflows_to_focus.add(subheartflow_id) + else: + logger.warning(f"{log_prefix} 任务 '{task_description}' 即将执行,但在 callback_info 中未找到 'subheartflow_id'。") + + # 2. 对需要聚焦的子心流执行状态切换 + if subflows_to_focus: + focused_count = 0 + async with self._lock: # 获取锁以安全地修改 subheartflows 字典及其状态 + for sf_id in subflows_to_focus: + subflow = self.subheartflows.get(sf_id) + if subflow: + # 检查当前状态是否需要切换 + if subflow.chat_state.chat_status != ChatState.FOCUSED: + try: + stream_name = chat_manager.get_stream_name(sf_id) or sf_id + logger.info(f"{log_prefix} 检测到子心流 [{stream_name}] 有任务即将执行,将其状态从 {subflow.chat_state.chat_status.value} 切换到 FOCUSED。") + await subflow.change_chat_state(ChatState.FOCUSED) + # 可以在此验证状态是否真的切换成功 + if subflow.chat_state.chat_status == ChatState.FOCUSED: + focused_count += 1 + else: + logger.warning(f"{log_prefix} 尝试聚焦 [{stream_name}] 后,状态仍为 {subflow.chat_state.chat_status.value}。") + except Exception as e: + logger.error(f"{log_prefix} 切换子心流 {sf_id} 状态到 FOCUSED 时出错: {e}", exc_info=True) + else: + # 如果已经是 FOCUSED,可以选择记录日志或跳过 + logger.debug(f"{log_prefix} 子心流 {sf_id} 已处于 FOCUSED 状态。") + else: + logger.warning(f"{log_prefix} 发现即将执行的任务关联的子心流 {sf_id} 不在当前管理列表中。") + if focused_count > 0: + logger.info(f"{log_prefix} 共将 {focused_count} 个子心流的状态切换到了 FOCUSED。") + + except Exception as e: + logger.error(f"{log_prefix} 检查即将到来的任务时发生错误: {e}", exc_info=True) + async def evaluate_interest_and_promote(self): """评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)""" try: