pull/1502/merge
exynos 2026-02-07 12:05:40 +00:00 committed by GitHub
commit acf90629fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 121 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import json
import time
import traceback
import random
@ -101,6 +102,10 @@ class BrainChatting:
# 最近一次是否成功进行了 reply用于选择 BrainPlanner 的 Prompt
self._last_successful_reply: bool = False
# side-effect 动作幂等缓存,避免同一触发消息在短时间内重复执行。
self._recent_side_effect_actions: Dict[str, float] = {}
self._side_effect_dedupe_window_sec = 10.0
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@ -161,8 +166,42 @@ class BrainChatting:
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
def _is_side_effect_action(self, action_type: str) -> bool:
non_side_effect_actions = {"reply", "wait", "wait_time", "listening", "complete_talk", "no_reply"}
return action_type not in non_side_effect_actions
def _build_side_effect_action_key(self, action_planner_info: ActionPlannerInfo) -> str:
action_data = dict(action_planner_info.action_data or {})
action_data.pop("loop_start_time", None)
target_message = action_planner_info.action_message
target_message_id = ""
if target_message is not None:
target_message_id = str(getattr(target_message, "message_id", "") or "")
payload = {
"action_type": action_planner_info.action_type,
"target_message_id": target_message_id,
"action_data": action_data,
}
return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str)
def _cleanup_recent_side_effect_actions(self, now: float) -> None:
dedupe_window_sec = self._side_effect_dedupe_window_sec
expired_keys = [
key
for key, ts in self._recent_side_effect_actions.items()
if now - ts > dedupe_window_sec
]
for key in expired_keys:
del self._recent_side_effect_actions[key]
def _is_duplicate_side_effect_action(self, key: str, now: float) -> bool:
dedupe_window_sec = self._side_effect_dedupe_window_sec
last_ts = self._recent_side_effect_actions.get(key)
return last_ts is not None and now - last_ts <= dedupe_window_sec
async def _loopbody(self): # sourcery skip: hoist-if-from-if
# 获取最新消息(用于上下文,但不影响是否调用 observe
recent_messages_list = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
start_time=self.last_read_time,
@ -174,20 +213,18 @@ class BrainChatting:
filter_intercept_message_level=1,
)
# 如果有新消息,更新 last_read_time 并触发事件以打断正在进行的 wait
if len(recent_messages_list) >= 1:
# 仅在有新消息时更新读取时间并触发事件。
# 无新消息时仍允许继续思考,具体动作由 Planner 限制为 reply/wait。
if recent_messages_list:
self.last_read_time = time.time()
self._new_message_event.set() # 触发新消息事件,打断 wait
self._new_message_event.set() # 触发新消息事件,打断正在进行的 wait
# 总是执行一次思考迭代(不管有没有新消息)
# wait 动作会在其内部等待,不需要在这里处理
should_continue = await self._observe(recent_messages_list=recent_messages_list)
if not should_continue:
# 选择了 complete_talk返回 False 表示需要等待新消息
return False
# 继续下一次迭代(除非选择了 complete_talk
# 短暂等待后再继续,避免过于频繁的循环
await asyncio.sleep(0.1)
@ -581,6 +618,22 @@ class BrainChatting:
"""执行单个动作的通用函数"""
try:
with Timer(f"动作{action_planner_info.action_type}", cycle_timers):
side_effect_action_key = ""
if self._is_side_effect_action(action_planner_info.action_type):
side_effect_action_key = self._build_side_effect_action_key(action_planner_info)
now = time.time()
self._cleanup_recent_side_effect_actions(now)
if self._is_duplicate_side_effect_action(side_effect_action_key, now):
logger.info(
f"{self.log_prefix} 跳过重复动作: {action_planner_info.action_type}"
)
return {
"action_type": action_planner_info.action_type,
"success": True,
"reply_text": "",
"command": "",
}
if action_planner_info.action_type == "complete_talk":
# 直接处理complete_talk逻辑不再通过动作系统
reason = action_planner_info.reasoning or "选择完成对话"
@ -784,6 +837,9 @@ class BrainChatting:
if success and action_planner_info.action_type != "reply":
self._last_successful_reply = False
if success and side_effect_action_key:
self._recent_side_effect_actions[side_effect_action_key] = time.time()
return {
"action_type": action_planner_info.action_type,
"success": success,

View File

@ -20,7 +20,7 @@ from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.chat.utils.utils import get_chat_type_and_target_info
from src.chat.utils.utils import get_chat_type_and_target_info, is_bot_self
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.base.component_types import ActionInfo, ComponentType, ActionActivationType
@ -280,6 +280,8 @@ class BrainPlanner:
show_actions=True,
)
previous_obs_time_mark = self.last_obs_time_mark
message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :]
chat_content_block_short, message_id_list_short = build_readable_messages_with_id(
messages=message_list_before_now_short,
@ -322,6 +324,23 @@ class BrainPlanner:
loop_start_time=loop_start_time,
)
has_new_user_message = any(
(msg.time or 0.0) > previous_obs_time_mark
and not is_bot_self(msg.user_info.platform or "", str(msg.user_info.user_id))
for msg in message_list_before_now
)
if not has_new_user_message:
actions, dropped_actions = self._restrict_actions_without_new_user_message(
actions=actions,
available_actions=available_actions,
message_id_list=message_id_list,
)
if dropped_actions:
logger.info(
f"{self.log_prefix}检测到无新用户消息,仅保留 reply/wait/complete_talk移除动作: {' '.join(dropped_actions)}"
)
reasoning = f"{reasoning};检测到无新用户消息,仅保留 reply/wait/complete_talk"
# 记录和展示计划日志
logger.info(
f"{self.log_prefix}Planner: {reasoning}。选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}"
@ -517,9 +536,6 @@ class BrainPlanner:
llm_duration_ms = (time.perf_counter() - llm_start) * 1000
llm_reasoning = reasoning_content
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}")
if global_config.debug.show_planner_prompt:
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}")
@ -597,6 +613,44 @@ class BrainPlanner:
)
]
def _restrict_actions_without_new_user_message(
self,
actions: List[ActionPlannerInfo],
available_actions: Dict[str, ActionInfo],
message_id_list: List[Tuple[str, "DatabaseMessages"]],
) -> Tuple[List[ActionPlannerInfo], List[str]]:
"""无新用户消息时,仅保留 reply/wait/complete_talk。"""
allowed_actions: List[ActionPlannerInfo] = []
dropped_actions: List[str] = []
for action in actions:
if action.action_type in {"reply", "complete_talk"}:
allowed_actions.append(action)
continue
if action.action_type in {"wait", "listening", "wait_time"}:
action.action_type = "wait"
action.action_data = action.action_data or {}
action.action_data.setdefault("wait_seconds", 5)
allowed_actions.append(action)
continue
dropped_actions.append(action.action_type)
if allowed_actions:
return allowed_actions, dropped_actions
target_message = message_id_list[-1][1] if message_id_list else None
fallback_wait = ActionPlannerInfo(
action_type="wait",
reasoning="没有新的用户消息,进入等待",
action_data={"wait_seconds": 5},
action_message=target_message,
available_actions=available_actions,
)
return [fallback_wait], dropped_actions
def add_plan_log(self, reasoning: str, actions: List[ActionPlannerInfo]):
"""添加计划日志"""
self.plan_log.append((reasoning, time.time(), actions))