diff --git a/src/chat/brain_chat/brain_chat.py b/src/chat/brain_chat/brain_chat.py index 8fd2de94..913f6f28 100644 --- a/src/chat/brain_chat/brain_chat.py +++ b/src/chat/brain_chat/brain_chat.py @@ -1,4 +1,5 @@ import asyncio +import json import time import traceback import random @@ -101,6 +102,10 @@ class BrainChatting: # 最近一次是否成功进行了 reply,用于选择 BrainPlanner 的 Prompt self._last_successful_reply: bool = False + # side-effect 动作幂等缓存,避免同一触发消息在短时间内重复执行。 + self._recent_side_effect_actions: Dict[str, float] = {} + self._side_effect_dedupe_window_sec = 10.0 + async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -161,8 +166,42 @@ class BrainChatting: + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) + def _is_side_effect_action(self, action_type: str) -> bool: + non_side_effect_actions = {"reply", "wait", "wait_time", "listening", "complete_talk", "no_reply"} + return action_type not in non_side_effect_actions + + def _build_side_effect_action_key(self, action_planner_info: ActionPlannerInfo) -> str: + action_data = dict(action_planner_info.action_data or {}) + action_data.pop("loop_start_time", None) + + target_message = action_planner_info.action_message + target_message_id = "" + if target_message is not None: + target_message_id = str(getattr(target_message, "message_id", "") or "") + + payload = { + "action_type": action_planner_info.action_type, + "target_message_id": target_message_id, + "action_data": action_data, + } + return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) + + def _cleanup_recent_side_effect_actions(self, now: float) -> None: + dedupe_window_sec = self._side_effect_dedupe_window_sec + expired_keys = [ + key + for key, ts in self._recent_side_effect_actions.items() + if now - ts > dedupe_window_sec + ] + for key in expired_keys: + del self._recent_side_effect_actions[key] + + def _is_duplicate_side_effect_action(self, key: str, now: float) -> bool: + dedupe_window_sec = self._side_effect_dedupe_window_sec + last_ts = self._recent_side_effect_actions.get(key) + return last_ts is not None and now - last_ts <= dedupe_window_sec + async def _loopbody(self): # sourcery skip: hoist-if-from-if - # 获取最新消息(用于上下文,但不影响是否调用 observe) recent_messages_list = message_api.get_messages_by_time_in_chat( chat_id=self.stream_id, start_time=self.last_read_time, @@ -174,20 +213,18 @@ class BrainChatting: filter_intercept_message_level=1, ) - # 如果有新消息,更新 last_read_time 并触发事件以打断正在进行的 wait - if len(recent_messages_list) >= 1: + # 仅在有新消息时更新读取时间并触发事件。 + # 无新消息时仍允许继续思考,具体动作由 Planner 限制为 reply/wait。 + if recent_messages_list: self.last_read_time = time.time() - self._new_message_event.set() # 触发新消息事件,打断 wait + self._new_message_event.set() # 触发新消息事件,打断正在进行的 wait - # 总是执行一次思考迭代(不管有没有新消息) - # wait 动作会在其内部等待,不需要在这里处理 should_continue = await self._observe(recent_messages_list=recent_messages_list) if not should_continue: # 选择了 complete_talk,返回 False 表示需要等待新消息 return False - # 继续下一次迭代(除非选择了 complete_talk) # 短暂等待后再继续,避免过于频繁的循环 await asyncio.sleep(0.1) @@ -581,6 +618,22 @@ class BrainChatting: """执行单个动作的通用函数""" try: with Timer(f"动作{action_planner_info.action_type}", cycle_timers): + side_effect_action_key = "" + if self._is_side_effect_action(action_planner_info.action_type): + side_effect_action_key = self._build_side_effect_action_key(action_planner_info) + now = time.time() + self._cleanup_recent_side_effect_actions(now) + if self._is_duplicate_side_effect_action(side_effect_action_key, now): + logger.info( + f"{self.log_prefix} 跳过重复动作: {action_planner_info.action_type}" + ) + return { + "action_type": action_planner_info.action_type, + "success": True, + "reply_text": "", + "command": "", + } + if action_planner_info.action_type == "complete_talk": # 直接处理complete_talk逻辑,不再通过动作系统 reason = action_planner_info.reasoning or "选择完成对话" @@ -784,6 +837,9 @@ class BrainChatting: if success and action_planner_info.action_type != "reply": self._last_successful_reply = False + if success and side_effect_action_key: + self._recent_side_effect_actions[side_effect_action_key] = time.time() + return { "action_type": action_planner_info.action_type, "success": success, diff --git a/src/chat/brain_chat/brain_planner.py b/src/chat/brain_chat/brain_planner.py index 7d5990e4..bdc594e4 100644 --- a/src/chat/brain_chat/brain_planner.py +++ b/src/chat/brain_chat/brain_planner.py @@ -20,7 +20,7 @@ from src.chat.utils.chat_message_builder import ( build_readable_messages_with_id, get_raw_msg_before_timestamp_with_chat, ) -from src.chat.utils.utils import get_chat_type_and_target_info +from src.chat.utils.utils import get_chat_type_and_target_info, is_bot_self from src.chat.planner_actions.action_manager import ActionManager from src.chat.message_receive.chat_stream import get_chat_manager from src.plugin_system.base.component_types import ActionInfo, ComponentType, ActionActivationType @@ -280,6 +280,8 @@ class BrainPlanner: show_actions=True, ) + previous_obs_time_mark = self.last_obs_time_mark + message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :] chat_content_block_short, message_id_list_short = build_readable_messages_with_id( messages=message_list_before_now_short, @@ -322,6 +324,23 @@ class BrainPlanner: loop_start_time=loop_start_time, ) + has_new_user_message = any( + (msg.time or 0.0) > previous_obs_time_mark + and not is_bot_self(msg.user_info.platform or "", str(msg.user_info.user_id)) + for msg in message_list_before_now + ) + if not has_new_user_message: + actions, dropped_actions = self._restrict_actions_without_new_user_message( + actions=actions, + available_actions=available_actions, + message_id_list=message_id_list, + ) + if dropped_actions: + logger.info( + f"{self.log_prefix}检测到无新用户消息,仅保留 reply/wait/complete_talk,移除动作: {' '.join(dropped_actions)}" + ) + reasoning = f"{reasoning};检测到无新用户消息,仅保留 reply/wait/complete_talk" + # 记录和展示计划日志 logger.info( f"{self.log_prefix}Planner: {reasoning}。选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}" @@ -517,9 +536,6 @@ class BrainPlanner: llm_duration_ms = (time.perf_counter() - llm_start) * 1000 llm_reasoning = reasoning_content - logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") - logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") - if global_config.debug.show_planner_prompt: logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") @@ -597,6 +613,44 @@ class BrainPlanner: ) ] + def _restrict_actions_without_new_user_message( + self, + actions: List[ActionPlannerInfo], + available_actions: Dict[str, ActionInfo], + message_id_list: List[Tuple[str, "DatabaseMessages"]], + ) -> Tuple[List[ActionPlannerInfo], List[str]]: + """无新用户消息时,仅保留 reply/wait/complete_talk。""" + allowed_actions: List[ActionPlannerInfo] = [] + dropped_actions: List[str] = [] + + for action in actions: + if action.action_type in {"reply", "complete_talk"}: + allowed_actions.append(action) + continue + + if action.action_type in {"wait", "listening", "wait_time"}: + action.action_type = "wait" + action.action_data = action.action_data or {} + action.action_data.setdefault("wait_seconds", 5) + allowed_actions.append(action) + continue + + dropped_actions.append(action.action_type) + + if allowed_actions: + return allowed_actions, dropped_actions + + target_message = message_id_list[-1][1] if message_id_list else None + fallback_wait = ActionPlannerInfo( + action_type="wait", + reasoning="没有新的用户消息,进入等待", + action_data={"wait_seconds": 5}, + action_message=target_message, + available_actions=available_actions, + ) + + return [fallback_wait], dropped_actions + def add_plan_log(self, reasoning: str, actions: List[ActionPlannerInfo]): """添加计划日志""" self.plan_log.append((reasoning, time.time(), actions))