fix(brain): dedupe side-effect actions per message

pull/1502/head
xiaoxi68 2026-02-07 03:04:20 +08:00
parent 13bfaf7469
commit 6079898a0e
1 changed files with 58 additions and 0 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import json
import time
import traceback
import random
@ -101,6 +102,9 @@ class BrainChatting:
# 最近一次是否成功进行了 reply用于选择 BrainPlanner 的 Prompt
self._last_successful_reply: bool = False
# side-effect 动作幂等缓存,避免同一触发消息在短时间内重复执行。
self._recent_side_effect_actions: Dict[str, float] = {}
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@ -161,6 +165,41 @@ class BrainChatting:
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
def _is_side_effect_action(self, action_type: str) -> bool:
non_side_effect_actions = {"reply", "wait", "wait_time", "listening", "complete_talk", "no_reply"}
return action_type not in non_side_effect_actions
def _build_side_effect_action_key(self, action_planner_info: ActionPlannerInfo) -> str:
action_data = dict(action_planner_info.action_data or {})
action_data.pop("loop_start_time", None)
target_message = action_planner_info.action_message
target_message_id = ""
if target_message is not None:
target_message_id = str(getattr(target_message, "message_id", "") or "")
payload = {
"action_type": action_planner_info.action_type,
"target_message_id": target_message_id,
"action_data": action_data,
}
return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str)
def _cleanup_recent_side_effect_actions(self, now: float) -> None:
dedupe_window_sec = 120.0
expired_keys = [
key
for key, ts in self._recent_side_effect_actions.items()
if now - ts > dedupe_window_sec
]
for key in expired_keys:
del self._recent_side_effect_actions[key]
def _is_duplicate_side_effect_action(self, key: str, now: float) -> bool:
dedupe_window_sec = 120.0
last_ts = self._recent_side_effect_actions.get(key)
return last_ts is not None and now - last_ts <= dedupe_window_sec
async def _loopbody(self): # sourcery skip: hoist-if-from-if
recent_messages_list = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
@ -580,6 +619,22 @@ class BrainChatting:
"""执行单个动作的通用函数"""
try:
with Timer(f"动作{action_planner_info.action_type}", cycle_timers):
side_effect_action_key = ""
if self._is_side_effect_action(action_planner_info.action_type):
side_effect_action_key = self._build_side_effect_action_key(action_planner_info)
now = time.time()
self._cleanup_recent_side_effect_actions(now)
if self._is_duplicate_side_effect_action(side_effect_action_key, now):
logger.info(
f"{self.log_prefix} 跳过重复副作用动作: {action_planner_info.action_type}"
)
return {
"action_type": action_planner_info.action_type,
"success": True,
"reply_text": "",
"command": "",
}
if action_planner_info.action_type == "complete_talk":
# 直接处理complete_talk逻辑不再通过动作系统
reason = action_planner_info.reasoning or "选择完成对话"
@ -783,6 +838,9 @@ class BrainChatting:
if success and action_planner_info.action_type != "reply":
self._last_successful_reply = False
if success and side_effect_action_key:
self._recent_side_effect_actions[side_effect_action_key] = time.time()
return {
"action_type": action_planner_info.action_type,
"success": success,