pull/1001/head
SnowindMe 2025-04-26 19:54:54 +08:00
commit b1bcd9c25c
14 changed files with 338 additions and 139 deletions

View File

@ -247,7 +247,7 @@ class InterestMonitorApp:
self.stream_chat_states[stream_id] = subflow_entry.get("sub_chat_state", "N/A")
self.stream_threshold_status[stream_id] = subflow_entry.get("is_above_threshold", False)
self.stream_last_active[stream_id] = subflow_entry.get(
"last_changed_state_time"
"chat_state_changed_time"
) # 存储原始时间戳
self.stream_last_interaction[stream_id] = subflow_entry.get(
"last_interaction_time"

View File

@ -105,7 +105,8 @@ c HeartFChatting工作方式
- 负责所有 `SubHeartflow` 实例的生命周期管理,包括:
- 创建和获取 (`get_or_create_subheartflow`)。
- 停止和清理 (`sleep_subheartflow`, `cleanup_inactive_subheartflows`)。
- 根据 `Heartflow` 的状态 (`self.mai_state_info`) 和限制条件,激活、停用或调整子心流的状态(例如 `enforce_subheartflow_limits`, `activate_random_subflows_to_chat`, `evaluate_interest_and_promote`)。
- 根据 `Heartflow` 的状态 (`self.mai_state_info`) 和限制条件,激活、停用或调整子心流的状态(例如 `enforce_subheartflow_limits`, `randomly_deactivate_subflows`, `evaluate_interest_and_promote`)。
- **新增**: 通过调用 `evaluate_and_transition_subflows_by_llm` 方法,使用 LLM (配置与 `Heartflow` 主 LLM 相同) 评估处于 `ABSENT``CHAT` 状态的子心流,根据观察到的活动摘要和 `Heartflow` 的当前状态,判断是否应在 `ABSENT``CHAT` 之间进行转换 (同样受限于 `CHAT` 状态的数量上限)。
- **清理机制**: 通过后台任务 (`BackgroundTaskManager`) 定期调用 `cleanup_inactive_subheartflows` 方法,此方法会识别并**删除**那些处于 `ABSENT` 状态超过一小时 (`INACTIVE_THRESHOLD_SECONDS`) 的子心流实例。
### 1.5. 消息处理与回复流程 (Message Processing vs. Replying Flow)
@ -149,9 +150,9 @@ c HeartFChatting工作方式
* `ChatState.FOCUSED` (专注/认真水群): 专注聊天模式。激活 `HeartFlowChatInstance`
- **选择**: 子心流可以根据外部指令(来自 `SubHeartflowManager`)或内部逻辑(未来的扩展)选择进入 `ABSENT` 状态(不回复不观察),或进入 `CHAT` / `FOCUSED` 中的一种回复模式。
- **状态转换机制** (由 `SubHeartflowManager` 驱动):
- **激活 `CHAT`**: 当 `Heartflow` 状态从 `OFFLINE` 变为允许聊天的状态时,`SubHeartflowManager` 会根据限制(通过 `self.mai_state_info` 获取),选择部分 `ABSENT` 状态的子心流,**检查当前 CHAT 状态数量是否达到上限**,如果未达上限,则调用其 `change_chat_state` 方法将其转换为 `CHAT`
- **激活 `CHAT`**: 当 `Heartflow` 状态从 `OFFLINE` 变为允许聊天的状态时,`SubHeartflowManager` 会根据限制(通过 `self.mai_state_info` 获取),选择部分 `ABSENT` 状态的子心流,**检查当前 CHAT 状态数量是否达到上限**,如果未达上限,则调用其 `change_chat_state` 方法将其转换为 `CHAT`此外,`evaluate_and_transition_subflows_by_llm` 方法也会根据 LLM 的判断,在未达上限时将 `ABSENT` 状态的子心流激活为 `CHAT`
- **激活 `FOCUSED`**: `SubHeartflowManager` 会定期评估处于 `CHAT` 状态的子心流的兴趣度 (`InterestChatting.start_hfc_probability`),若满足条件且**检查当前 FOCUSED 状态数量未达上限**(通过 `self.mai_state_info` 获取限制),则调用 `change_chat_state` 将其提升为 `FOCUSED`
- **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃或随机概率等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
- **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`) 或 LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。
- **注意**: `change_chat_state` 方法本身只负责执行状态转换和管理内部聊天实例(`NormalChatInstance`/`HeartFlowChatInstance`),不再进行限额检查。限额检查的责任完全由调用方(即 `SubHeartflowManager` 中的相关方法,这些方法会使用内部存储的 `mai_state_info` 来获取限制)承担。
## 3. 聊天实例详解 (Chat Instances Explained)

View File

@ -34,7 +34,6 @@ class BackgroundTaskManager:
update_interval: int,
cleanup_interval: int,
log_interval: int,
inactive_threshold: int,
# 新增兴趣评估间隔参数
interest_eval_interval: int = INTEREST_EVAL_INTERVAL_SECONDS,
# 新增随机停用间隔参数
@ -58,6 +57,7 @@ class BackgroundTaskManager:
self._logging_task: Optional[asyncio.Task] = None
self._interest_eval_task: Optional[asyncio.Task] = None # 新增兴趣评估任务引用
self._random_deactivation_task: Optional[asyncio.Task] = None # 新增随机停用任务引用
self._hf_judge_state_update_task: Optional[asyncio.Task] = None # 新增状态评估任务引用
self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks
async def start_tasks(self):
@ -79,12 +79,20 @@ class BackgroundTaskManager:
f"聊天状态更新任务已启动 间隔:{self.update_interval}s",
"_state_update_task",
),
(
self._hf_judge_state_update_task,
lambda: self._run_hf_judge_state_update_cycle(60),
"hf_judge_state_update",
"debug",
f"状态评估任务已启动 间隔:{60}s",
"_hf_judge_state_update_task",
),
(
self._cleanup_task,
self._run_cleanup_cycle,
"hf_cleanup",
"info",
f"清理任务已启动 间隔:{self.cleanup_interval}s 阈值:{self.inactive_threshold}s",
f"清理任务已启动 间隔:{self.cleanup_interval}s",
"_cleanup_task",
),
(
@ -203,22 +211,22 @@ class BackgroundTaskManager:
if state_changed:
current_state = self.mai_state_info.get_current_state()
await self.subheartflow_manager.enforce_subheartflow_limits(current_state)
await self.subheartflow_manager.enforce_subheartflow_limits()
# 状态转换处理
if (
previous_status == self.mai_state_info.mai_status.OFFLINE
and current_state != self.mai_state_info.mai_status.OFFLINE
):
logger.info("[后台任务] 主状态激活,触发子流激活")
await self.subheartflow_manager.activate_random_subflows_to_chat(current_state)
elif (
current_state == self.mai_state_info.mai_status.OFFLINE
and previous_status != self.mai_state_info.mai_status.OFFLINE
):
logger.info("检测到离线,停用所有子心流")
await self.subheartflow_manager.deactivate_all_subflows()
async def _perform_hf_judge_state_update_work(self):
"""调用llm检测是否转换ABSENT-CHAT状态"""
logger.info("[状态评估任务] 开始基于LLM评估子心流状态...")
await self.subheartflow_manager.evaluate_and_transition_subflows_by_llm()
async def _perform_cleanup_work(self):
"""执行子心流清理任务
1. 获取需要清理的不活跃子心流列表
@ -252,7 +260,7 @@ class BackgroundTaskManager:
async def _perform_interest_eval_work(self):
"""执行一轮子心流兴趣评估与提升检查。"""
# 直接调用 subheartflow_manager 的方法,并传递当前状态信息
await self.subheartflow_manager.evaluate_interest_and_promote(self.mai_state_info)
await self.subheartflow_manager.evaluate_interest_and_promote()
# --- 结束新增 ---
@ -269,6 +277,11 @@ class BackgroundTaskManager:
task_name="State Update", interval=interval, task_func=self._perform_state_update_work
)
async def _run_hf_judge_state_update_cycle(self, interval: int):
await self._run_periodic_loop(
task_name="State Update", interval=interval, task_func=self._perform_hf_judge_state_update_work
)
async def _run_cleanup_cycle(self):
await self._run_periodic_loop(
task_name="Subflow Cleanup", interval=self.cleanup_interval, task_func=self._perform_cleanup_work

View File

@ -72,7 +72,6 @@ class Heartflow:
update_interval=STATE_UPDATE_INTERVAL_SECONDS,
cleanup_interval=CLEANUP_INTERVAL_SECONDS,
log_interval=3, # Example: Using value directly, ideally get from config
inactive_threshold=INACTIVE_THRESHOLD_SECONDS,
)
async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]:

View File

@ -58,7 +58,7 @@ class InterestLogger:
return results
for subheartflow in all_flows:
if self.subheartflow_manager.get_or_create_subheartflow(subheartflow.subheartflow_id):
if await self.subheartflow_manager.get_or_create_subheartflow(subheartflow.subheartflow_id):
tasks.append(
asyncio.create_task(subheartflow.get_full_state(), name=f"get_state_{subheartflow.subheartflow_id}")
)

View File

@ -86,9 +86,27 @@ class InterestChatting:
logger.debug("后台兴趣更新任务已创建并启动。")
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
"""添加消息到兴趣字典
参数:
message: 接收到的消息
interest_value: 兴趣值
is_mentioned: 是否被提及
功能:
1. 将消息添加到兴趣字典
2. 更新最后交互时间
3. 如果字典长度超过10删除最旧的消息
"""
# 添加新消息
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time()
# 如果字典长度超过10删除最旧的消息
if len(self.interest_dict) > 10:
oldest_key = next(iter(self.interest_dict))
self.interest_dict.pop(oldest_key)
async def _calculate_decay(self):
"""计算兴趣值的衰减
@ -481,7 +499,7 @@ class SubHeartflow:
"interest_state": interest_state,
"current_mind": self.sub_mind.current_mind,
"chat_state": self.chat_state.chat_status.value,
"last_changed_state_time": self.last_changed_state_time,
"chat_state_changed_time": self.chat_state_changed_time,
}
async def shutdown(self):

View File

@ -2,6 +2,7 @@ import asyncio
import time
import random
from typing import Dict, Any, Optional, List
import json # 导入 json 模块
# 导入日志模块
from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG
@ -14,6 +15,12 @@ from src.heart_flow.sub_heartflow import SubHeartflow, ChatState
from src.heart_flow.mai_state_manager import MaiStateInfo
from .observation import ChattingObservation
# 导入LLM请求工具
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
import traceback
# 初始化日志记录器
subheartflow_manager_log_config = LogConfig(
@ -34,6 +41,15 @@ class SubHeartflowManager:
self._lock = asyncio.Lock() # 用于保护 self.subheartflows 的访问
self.mai_state_info: MaiStateInfo = mai_state_info # 存储传入的 MaiStateInfo 实例
# 为 LLM 状态评估创建一个 LLMRequest 实例
# 使用与 Heartflow 相同的模型和参数
self.llm_state_evaluator = LLMRequest(
model=global_config.llm_heartflow, # 与 Heartflow 一致
temperature=0.6, # 与 Heartflow 一致
max_tokens=1000, # 与 Heartflow 一致 (虽然可能不需要这么多)
request_type="subheartflow_state_eval", # 保留特定的请求类型
)
def get_all_subheartflows(self) -> List["SubHeartflow"]:
"""获取所有当前管理的 SubHeartflow 实例列表 (快照)。"""
return list(self.subheartflows.values())
@ -74,7 +90,7 @@ class SubHeartflowManager:
# 注册子心流
self.subheartflows[subheartflow_id] = new_subflow
heartflow_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
logger.info(f"[{heartflow_name}] 开始消息")
logger.info(f"[{heartflow_name}] 开始接收消息")
# 启动后台任务
asyncio.create_task(new_subflow.subheartflow_start_working())
@ -103,24 +119,6 @@ class SubHeartflowManager:
except Exception as e:
logger.error(f"[子心流管理] 设置ABSENT状态失败: {e}")
# 停止子心流内部循环
subheartflow.should_stop = True
# 取消后台任务
task = subheartflow.task
if task and not task.done():
task.cancel()
logger.debug(f"[子心流管理] 已取消 {stream_name} 的后台任务")
# 从管理字典中移除
if subheartflow_id in self.subheartflows:
del self.subheartflows[subheartflow_id]
logger.debug(f"[子心流管理] 已移除 {stream_name}")
return True
else:
logger.warning(f"[子心流管理] {stream_name} 已被提前移除")
return False
def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
"""识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)"""
current_time = time.time()
@ -185,53 +183,6 @@ class SubHeartflowManager:
else:
logger.debug(f"[限制] 无需停止, 当前总数:{len(self.subheartflows)}")
async def activate_random_subflows_to_chat(self):
"""主状态激活时随机选择ABSENT子心流进入CHAT状态"""
# 使用 self.mai_state_info 获取当前状态和限制
current_mai_state = self.mai_state_info.get_current_state()
limit = current_mai_state.get_normal_chat_max_num()
if limit <= 0:
logger.info("[激活] 当前状态不允许CHAT子心流")
return
# 获取所有ABSENT状态的子心流
absent_flows = [flow for flow in self.subheartflows.values() if flow.chat_state.chat_status == ChatState.ABSENT]
num_to_activate = min(limit, len(absent_flows))
if num_to_activate <= 0:
logger.info(f"[激活] 无可用ABSENT子心流(限额:{limit}, 可用:{len(absent_flows)})")
return
logger.info(f"[激活] 随机选择{num_to_activate}个ABSENT子心流进入CHAT状态")
activated_count = 0
for flow in random.sample(absent_flows, num_to_activate):
flow_id = flow.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
if flow_id not in self.subheartflows:
logger.warning(f"[激活] 跳过{stream_name}, 子心流已不存在")
continue
logger.debug(f"[激活] 正在激活子心流{stream_name}")
# --- 限额检查 --- #
current_chat_count = self.count_subflows_by_state(ChatState.CHAT)
if current_chat_count >= limit:
logger.warning(f"[激活] 跳过{stream_name}, 普通聊天已达上限 ({current_chat_count}/{limit})")
continue # 跳过此子心流,继续尝试激活下一个
# --- 结束限额检查 --- #
# 移除 states_num 参数
await flow.change_chat_state(ChatState.CHAT)
if flow.chat_state.chat_status == ChatState.CHAT:
activated_count += 1
else:
logger.warning(f"[激活] {stream_name}状态设置失败")
logger.info(f"[激活] 完成, 成功激活{activated_count}个子心流")
async def deactivate_all_subflows(self):
"""将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)"""
# logger.info("[停用] 开始将所有子心流状态设置为 ABSENT")
@ -394,15 +345,187 @@ class SubHeartflowManager:
else:
logger.debug(f"{log_prefix_manager} 随机停用周期结束, 未停用任何子心流。")
def count_subflows_by_state(self, state: ChatState) -> int:
"""统计指定状态的子心流数量
async def evaluate_and_transition_subflows_by_llm(self):
"""
使用LLM评估每个子心流的状态并根据LLM的判断执行状态转换ABSENT <-> CHAT
注意此函数包含对假设的LLM函数的调用
"""
log_prefix = "[LLM状态评估]"
logger.info(f"{log_prefix} 开始基于LLM评估子心流状态...")
# 获取当前状态和限制用于CHAT激活检查
current_mai_state = self.mai_state_info.get_current_state()
chat_limit = current_mai_state.get_normal_chat_max_num()
transitioned_to_chat = 0
transitioned_to_absent = 0
async with self._lock: # 在锁内获取快照并迭代
subflows_snapshot = list(self.subheartflows.values())
# 使用不上锁的版本,因为我们已经在锁内
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if not subflows_snapshot:
logger.info(f"{log_prefix} 当前没有子心流需要评估。")
return
for sub_hf in subflows_snapshot:
flow_id = sub_hf.subheartflow_id
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
current_subflow_state = sub_hf.chat_state.chat_status
# --- 获取观察内容 ---
# 从 sub_hf.observations 获取 ChattingObservation 并提取信息
_observation_summary = "没有可用的观察信息。" # 默认值
try:
# 检查 observations 列表是否存在且不为空
# 假设第一个观察者是 ChattingObservation
first_observation = sub_hf.observations[0]
if isinstance(first_observation, ChattingObservation):
# 组合中期记忆和当前聊天内容
current_chat = first_observation.talking_message_str or "当前无聊天内容。"
combined_summary = f"当前聊天内容:\n{current_chat}"
else:
logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。")
except Exception as e:
logger.warning(f"{log_prefix} [{stream_name}] 获取观察信息失败: {e}", exc_info=True)
# 保留默认值或错误信息
combined_summary = f"获取观察信息时出错: {e}"
# --- 获取麦麦状态 ---
mai_state_description = f"麦麦当前状态: {current_mai_state.value}"
# --- 针对 ABSENT 状态 ---
if current_subflow_state == ChatState.ABSENT:
# 构建Prompt
prompt = (
f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态.\n"
f"{mai_state_description}\n"
f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
f"基于以上信息,该子心流是否表现出足够的活跃迹象或重要性,"
f"值得将其唤醒并进入常规聊天(CHAT)状态?\n"
f"请以 JSON 格式回答,包含一个键 'decision',其值为 true 或 false.\n"
f'例如:{{"decision": true}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_activate = await self._llm_evaluate_state_transition(prompt)
if should_activate is None: # 处理解析失败或意外情况
logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果跳过。")
continue
if should_activate:
# 检查CHAT限额
# 使用不上锁的版本,因为我们已经在锁内
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
if current_chat_count < chat_limit:
logger.info(
f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..."
)
await sub_hf.change_chat_state(ChatState.CHAT)
if sub_hf.chat_state.chat_status == ChatState.CHAT:
transitioned_to_chat += 1
else:
logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。")
else:
logger.info(
f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态但已达到上限({current_chat_count}/{chat_limit})。跳过转换。"
)
# --- 针对 CHAT 状态 ---
elif current_subflow_state == ChatState.CHAT:
# 构建Prompt
prompt = (
f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态.\n"
f"{mai_state_description}\n"
f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n"
f"基于以上信息,该子心流是否表现出不活跃、对话结束或不再需要关注的迹象,"
f"应该让其进入休眠(ABSENT)状态?\n"
f"请以 JSON 格式回答,包含一个键 'decision',其值为 true (表示应休眠) 或 false (表示不应休眠).\n"
f'例如:{{"decision": true}}\n'
f"请只输出有效的 JSON 对象。"
)
# 调用LLM评估
should_deactivate = await self._llm_evaluate_state_transition(prompt)
if should_deactivate is None: # 处理解析失败或意外情况
logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果跳过。")
continue
if should_deactivate:
logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...")
await sub_hf.change_chat_state(ChatState.ABSENT)
if sub_hf.chat_state.chat_status == ChatState.ABSENT:
transitioned_to_absent += 1
logger.info(
f"{log_prefix} LLM评估周期结束。"
f" 成功转换到CHAT: {transitioned_to_chat}."
f" 成功转换到ABSENT: {transitioned_to_absent}."
)
async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]:
"""
使用 LLM 评估是否应进行状态转换期望 LLM 返回 JSON 格式
Args:
state: 要统计的聊天状态枚举值
prompt: 提供给 LLM 的提示信息要求返回 {"decision": true/false}
Returns:
int: 处于该状态的子心流数量
Optional[bool]: 如果成功解析 LLM JSON 响应并提取了 'decision' 键的值则返回该布尔值
如果 LLM 调用失败返回无效 JSON JSON 中缺少 'decision' 键或其值不是布尔型则返回 None
"""
log_prefix = "[LLM状态评估]"
try:
# --- 真实的 LLM 调用 ---
response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt)
logger.debug(
f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: ```{response_text}```"
)
# --- 解析 JSON 响应 ---
try:
# 尝试去除可能的Markdown代码块标记
cleaned_response = response_text.strip().strip("`").strip()
if cleaned_response.startswith("json"):
cleaned_response = cleaned_response[4:].strip()
data = json.loads(cleaned_response)
decision = data.get("decision") # 使用 .get() 避免 KeyError
if isinstance(decision, bool):
logger.debug(f"{log_prefix} LLM评估结果 (来自JSON): {'建议转换' if decision else '建议不转换'}")
return decision
else:
logger.warning(
f"{log_prefix} LLM 返回的 JSON 中 'decision' 键的值不是布尔型: {decision}。响应: {response_text}"
)
return None # 值类型不正确
except json.JSONDecodeError as json_err:
logger.warning(f"{log_prefix} LLM 返回的响应不是有效的 JSON: {json_err}。响应: {response_text}")
# 尝试在非JSON响应中查找关键词作为后备方案 (可选)
if "true" in response_text.lower():
logger.debug(f"{log_prefix} 在非JSON响应中找到 'true',解释为建议转换")
return True
if "false" in response_text.lower():
logger.debug(f"{log_prefix} 在非JSON响应中找到 'false',解释为建议不转换")
return False
return None # JSON 解析失败,也未找到关键词
except Exception as parse_err: # 捕获其他可能的解析错误
logger.warning(f"{log_prefix} 解析 LLM JSON 响应时发生意外错误: {parse_err}。响应: {response_text}")
return None
except Exception as e:
logger.error(f"{log_prefix} 调用 LLM 或处理其响应时出错: {e}", exc_info=True)
traceback.print_exc()
return None # LLM 调用或处理失败
def count_subflows_by_state(self, state: ChatState) -> int:
"""统计指定状态的子心流数量"""
count = 0
# 遍历所有子心流实例
for subheartflow in self.subheartflows.values():
@ -411,12 +534,19 @@ class SubHeartflowManager:
count += 1
return count
def get_active_subflow_minds(self) -> List[str]:
"""获取所有活跃(非ABSENT)子心流的当前想法
返回:
List[str]: 包含所有活跃子心流当前想法的列表
def count_subflows_by_state_nolock(self, state: ChatState) -> int:
"""
统计指定状态的子心流数量 (不上锁版本)
警告仅应在已持有 self._lock 的上下文中使用此方法
"""
count = 0
for subheartflow in self.subheartflows.values():
if subheartflow.chat_state.chat_status == state:
count += 1
return count
def get_active_subflow_minds(self) -> List[str]:
"""获取所有活跃(非ABSENT)子心流的当前想法"""
minds = []
for subheartflow in self.subheartflows.values():
# 检查子心流是否活跃(非ABSENT状态)

View File

@ -4,7 +4,8 @@ import datetime
# from .message_storage import MongoDBMessageStorage
from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from ...config.config import global_config
# from ...config.config import global_config
from typing import Dict, Any
from ..chat.message import Message
from .pfc_types import ConversationState
@ -77,7 +78,7 @@ class Conversation:
raise
try:
logger.info(f"{self.stream_id} 加载初始聊天记录...")
initial_messages = await get_raw_msg_before_timestamp_with_chat( #
initial_messages = get_raw_msg_before_timestamp_with_chat( #
chat_id=self.stream_id,
timestamp=time.time(),
limit=30, # 加载最近30条作为初始上下文可以调整
@ -436,43 +437,13 @@ class Conversation:
try:
# 外层 try: 捕获发送消息和后续处理中的主要错误
current_time = time.time() # 获取当前时间戳
_current_time = time.time() # 获取当前时间戳
reply_content = self.generated_reply # 获取要发送的内容
# 发送消息
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
logger.info(f"消息已发送: {reply_content}") # 可以在发送后加个日志确认
# --- 添加的立即更新状态逻辑开始 ---
try:
# 内层 try: 专门捕获手动更新状态时可能出现的错误
# 创建一个代表刚刚发送的消息的字典
bot_message_info = {
"message_id": f"bot_sent_{current_time}", # 创建一个简单的唯一ID
"time": current_time,
"user_info": UserInfo( # 使用 UserInfo 类构建用户信息
user_id=str(global_config.BOT_QQ),
user_nickname=global_config.BOT_NICKNAME,
platform=self.chat_stream.platform, # 从 chat_stream 获取平台信息
).to_dict(), # 转换为字典格式存储
"processed_plain_text": reply_content, # 使用发送的内容
"detailed_plain_text": f"{int(current_time)},{global_config.BOT_NICKNAME}:{reply_content}", # 构造一个简单的详细文本, 时间戳取整
# 可以根据需要添加其他字段,保持与 observation_info.chat_history 中其他消息结构一致
}
# 直接更新 ObservationInfo 实例
if self.observation_info:
self.observation_info.chat_history.append(bot_message_info) # 将消息添加到历史记录末尾
self.observation_info.last_bot_speak_time = current_time # 更新 Bot 最后发言时间
self.observation_info.last_message_time = current_time # 更新最后消息时间
logger.debug("已手动将Bot发送的消息添加到 ObservationInfo")
else:
logger.warning("无法手动更新 ObservationInfo实例不存在")
except Exception as update_err:
logger.error(f"手动更新 ObservationInfo 时出错: {update_err}")
# --- 添加的立即更新状态逻辑结束 ---
# 原有的触发更新和等待代码
self.chat_observer.trigger_update()
if not await self.chat_observer.wait_for_update():

View File

@ -376,6 +376,7 @@ class DirectMessageSender:
# 发送消息
try:
await self.send_via_ws(message)
await self.storage.store_message(message, chat_stream)
logger.success(f"PFC消息已发送: {content}")
except Exception as e:
logger.error(f"PFC消息发送失败: {str(e)}")

View File

@ -404,10 +404,10 @@ class HeartFChatting:
return False, ""
# execute:执行
with Timer("执行动作", cycle_timers):
return await self._handle_action(
action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time
)
return await self._handle_action(
action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time
)
except PlannerError as e:
logger.error(f"{self.log_prefix} 规划错误: {e}")
@ -560,7 +560,7 @@ class HeartFChatting:
observation = self.observations[0] if self.observations else None
try:
with Timer("Wait New Msg", cycle_timers):
with Timer("等待新消息", cycle_timers):
return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 等待被中断")
@ -584,8 +584,8 @@ class HeartFChatting:
logger.info(f"{log_prefix} 检测到新消息")
return True
if time.monotonic() - wait_start_time > 300:
logger.warning(f"{log_prefix} 等待超时(300秒)")
if time.monotonic() - wait_start_time > 120:
logger.warning(f"{log_prefix} 等待超时(120秒)")
return False
await asyncio.sleep(1.5)
@ -604,8 +604,6 @@ class HeartFChatting:
async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
cycle_duration = time.monotonic() - cycle_start_time
# if cycle_duration > 0.1:
# logger.debug(f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s.")
try:
sleep_duration = 0.0

View File

@ -167,7 +167,7 @@ class HeartFCProcessor:
# 6. 兴趣度计算与更新
interested_rate, is_mentioned = await self._calculate_interest(message)
await subheartflow.interest_chatting.increase_interest(value=interested_rate)
await subheartflow.interest_chatting.add_interest_dict(message, interested_rate, is_mentioned)
subheartflow.interest_chatting.add_interest_dict(message, interested_rate, is_mentioned)
# 7. 日志记录
mes_name = chat.group_info.group_name if chat.group_info else "私聊"

View File

@ -67,6 +67,7 @@ def init_prompt():
2. 文字回复(text_reply)适用
- 有实质性内容需要表达
- 有人提到你但你还没有回应他
- 可以追加emoji_query表达情绪(格式情绪描述,"俏皮的调侃")
- 不要追加太多表情

View File

@ -1,6 +1,7 @@
import time
import asyncio
import traceback
import statistics # 导入 statistics 模块
from random import random
from typing import List, Optional # 导入 Optional
@ -46,6 +47,8 @@ class NormalChat:
self.gpt = NormalChatGenerator()
self.mood_manager = MoodManager.get_instance() # MoodManager 保持单例
# 存储此实例的兴趣监控任务
self.start_time = time.time()
self._chat_task: Optional[asyncio.Task] = None
logger.info(f"[{self.stream_name}] NormalChat 实例初始化完成。")
@ -317,6 +320,61 @@ class NormalChat:
# 意愿管理器注销当前message信息 (无论是否回复,只要处理过就删除)
willing_manager.delete(message.message_info.message_id)
# --- 新增:处理初始高兴趣消息的私有方法 ---
async def _process_initial_interest_messages(self):
"""处理启动时存在于 interest_dict 中的高兴趣消息。"""
items_to_process = list(self.interest_dict.items())
if not items_to_process:
return # 没有初始消息,直接返回
logger.info(f"[{self.stream_name}] 发现 {len(items_to_process)} 条初始兴趣消息,开始处理高兴趣部分...")
interest_values = [item[1][1] for item in items_to_process] # 提取兴趣值列表
messages_to_reply = [] # 需要立即回复的消息
if len(interest_values) == 1:
# 如果只有一个消息,直接处理
messages_to_reply.append(items_to_process[0])
logger.info(f"[{self.stream_name}] 只有一条初始消息,直接处理。")
elif len(interest_values) > 1:
# 计算均值和标准差
try:
mean_interest = statistics.mean(interest_values)
stdev_interest = statistics.stdev(interest_values)
threshold = mean_interest + stdev_interest
logger.info(
f"[{self.stream_name}] 初始兴趣值 均值: {mean_interest:.2f}, 标准差: {stdev_interest:.2f}, 阈值: {threshold:.2f}"
)
# 找出高于阈值的消息
for item in items_to_process:
msg_id, (message, interest_value, is_mentioned) = item
if interest_value > threshold:
messages_to_reply.append(item)
logger.info(f"[{self.stream_name}] 找到 {len(messages_to_reply)} 条高于阈值的初始消息进行处理。")
except statistics.StatisticsError as e:
logger.error(f"[{self.stream_name}] 计算初始兴趣统计值时出错: {e},跳过初始处理。")
# 处理需要回复的消息
processed_count = 0
for item in messages_to_reply:
msg_id, (message, interest_value, is_mentioned) = item
try:
logger.info(f"[{self.stream_name}] 处理初始高兴趣消息 {msg_id} (兴趣值: {interest_value:.2f})")
await self.normal_response(message=message, is_mentioned=is_mentioned, interested_rate=interest_value)
processed_count += 1
except Exception as e:
logger.error(f"[{self.stream_name}] 处理初始兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}")
finally:
# 无论成功与否都清空兴趣字典
self.interest_dict.clear()
logger.info(
f"[{self.stream_name}] 初始高兴趣消息处理完毕,共处理 {processed_count} 条。剩余 {len(self.interest_dict)} 条待轮询。"
)
# --- 新增结束 ---
# 保持 staticmethod, 因为不依赖实例状态, 但需要 chat 对象来获取日志上下文
@staticmethod
def _check_ban_words(text: str, chat: ChatStream, userinfo: UserInfo) -> bool:
@ -350,11 +408,20 @@ class NormalChat:
# 改为实例方法, 移除 chat 参数
async def start_chat(self):
"""为此 NormalChat 实例关联的 ChatStream 启动聊天任务(如果尚未运行)。"""
"""为此 NormalChat 实例关联的 ChatStream 启动聊天任务(如果尚未运行),
并在启动前处理一次初始的高兴趣消息"""
if self._chat_task is None or self._chat_task.done():
# --- 修改:调用新的私有方法处理初始消息 ---
await self._process_initial_interest_messages()
# --- 修改结束 ---
# 启动后台轮询任务
logger.info(f"[{self.stream_name}] 启动后台兴趣消息轮询任务...")
task = asyncio.create_task(self._reply_interested_message())
task.add_done_callback(lambda t: self._handle_task_completion(t)) # 回调现在是实例方法
self._chat_task = task
else:
logger.info(f"[{self.stream_name}] 聊天任务已在运行中。")
def _handle_task_completion(self, task: asyncio.Task):
"""任务完成回调处理"""

View File

@ -404,7 +404,7 @@ class Hippocampus:
# logger.info("没有找到有效的关键词节点")
return []
logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
logger.debug(f"有效的关键词: {', '.join(valid_keywords)}")
# 从每个关键词获取记忆
all_memories = []
@ -576,7 +576,7 @@ class Hippocampus:
# logger.info("没有找到有效的关键词节点")
return []
logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
logger.debug(f"有效的关键词: {', '.join(valid_keywords)}")
# 从每个关键词获取记忆
all_memories = []
@ -761,7 +761,7 @@ class Hippocampus:
# logger.info("没有找到有效的关键词节点")
return 0
logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
logger.debug(f"有效的关键词: {', '.join(valid_keywords)}")
# 从每个关键词获取记忆
activate_map = {} # 存储每个词的累计激活值