diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index da02e845..ca907406 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -4,6 +4,7 @@ import traceback import random import re from typing import Dict, Optional, Tuple, List, TYPE_CHECKING, Union +from collections import OrderedDict from rich.traceback import install from datetime import datetime from json_repair import repair_json @@ -110,6 +111,10 @@ class ActionPlanner: self.last_obs_time_mark = 0.0 self.plan_log: List[Tuple[str, float, Union[List[ActionPlannerInfo], str]]] = [] + + # 黑话缓存:使用 OrderedDict 实现 LRU,最多缓存10个 + self.unknown_words_cache: OrderedDict[str, None] = OrderedDict() + self.unknown_words_cache_limit = 10 def find_message_by_id( self, message_id: str, message_id_list: List[Tuple[str, "DatabaseMessages"]] @@ -299,6 +304,136 @@ class ActionPlanner: logger.warning(f"{self.log_prefix}检测消息发送者失败,缺少必要字段") return False + def _update_unknown_words_cache(self, new_words: List[str]) -> None: + """ + 更新黑话缓存,将新的黑话加入缓存 + + Args: + new_words: 新提取的黑话列表 + """ + for word in new_words: + if not isinstance(word, str): + continue + word = word.strip() + if not word: + continue + + # 如果已存在,移到末尾(LRU) + if word in self.unknown_words_cache: + self.unknown_words_cache.move_to_end(word) + else: + # 添加新词 + self.unknown_words_cache[word] = None + # 如果超过限制,移除最老的 + if len(self.unknown_words_cache) > self.unknown_words_cache_limit: + self.unknown_words_cache.popitem(last=False) + logger.debug(f"{self.log_prefix}黑话缓存已满,移除最老的黑话") + + def _merge_unknown_words_with_cache(self, new_words: Optional[List[str]]) -> List[str]: + """ + 合并新提取的黑话和缓存中的黑话 + + Args: + new_words: 新提取的黑话列表(可能为None) + + Returns: + 合并后的黑话列表(去重) + """ + # 清理新提取的黑话 + cleaned_new_words: List[str] = [] + if new_words: + for word in new_words: + if isinstance(word, str): + word = word.strip() + if word: + cleaned_new_words.append(word) + + # 获取缓存中的黑话列表 + cached_words = list(self.unknown_words_cache.keys()) + + # 合并并去重(保留顺序:新提取的在前,缓存的在后) + merged_words: List[str] = [] + seen = set() + + # 先添加新提取的 + for word in cleaned_new_words: + if word not in seen: + merged_words.append(word) + seen.add(word) + + # 再添加缓存的(如果不在新提取的列表中) + for word in cached_words: + if word not in seen: + merged_words.append(word) + seen.add(word) + + return merged_words + + def _process_unknown_words_cache( + self, actions: List[ActionPlannerInfo] + ) -> None: + """ + 处理黑话缓存逻辑: + 1. 检查是否有 reply action 提取了 unknown_words + 2. 如果没有提取,移除最老的1个 + 3. 如果缓存数量大于5,移除最老的2个 + 4. 对于每个 reply action,合并缓存和新提取的黑话 + 5. 更新缓存 + + Args: + actions: 解析后的动作列表 + """ + # 先检查缓存数量,如果大于5,移除最老的2个 + if len(self.unknown_words_cache) > 5: + # 移除最老的2个 + removed_count = 0 + for _ in range(2): + if len(self.unknown_words_cache) > 0: + self.unknown_words_cache.popitem(last=False) + removed_count += 1 + if removed_count > 0: + logger.debug(f"{self.log_prefix}缓存数量大于5,移除最老的{removed_count}个缓存") + + # 检查是否有 reply action 提取了 unknown_words + has_extracted_unknown_words = False + for action in actions: + if action.action_type == "reply": + action_data = action.action_data or {} + unknown_words = action_data.get("unknown_words") + if unknown_words and isinstance(unknown_words, list) and len(unknown_words) > 0: + has_extracted_unknown_words = True + break + + # 如果当前 plan 的 reply 没有提取,移除最老的1个 + if not has_extracted_unknown_words: + if len(self.unknown_words_cache) > 0: + self.unknown_words_cache.popitem(last=False) + logger.debug(f"{self.log_prefix}当前 plan 的 reply 没有提取黑话,移除最老的1个缓存") + + # 对于每个 reply action,合并缓存和新提取的黑话 + for action in actions: + if action.action_type == "reply": + action_data = action.action_data or {} + new_words = action_data.get("unknown_words") + + # 合并新提取的和缓存的黑话列表 + merged_words = self._merge_unknown_words_with_cache(new_words) + + # 更新 action_data + if merged_words: + action_data["unknown_words"] = merged_words + logger.debug( + f"{self.log_prefix}合并黑话:新提取 {len(new_words) if new_words else 0} 个," + f"缓存 {len(self.unknown_words_cache)} 个,合并后 {len(merged_words)} 个" + ) + else: + # 如果没有合并后的黑话,移除 unknown_words 字段 + action_data.pop("unknown_words", None) + + # 更新缓存(将新提取的黑话加入缓存) + if new_words: + self._update_unknown_words_cache(new_words) + async def plan( self, available_actions: Dict[str, ActionInfo], @@ -722,6 +857,9 @@ class ActionPlanner: random.shuffle(shuffled) actions = list({a.action_type: a for a in shuffled}.values()) + # 处理黑话缓存逻辑 + self._process_unknown_words_cache(actions) + logger.debug(f"{self.log_prefix}规划器选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}") return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms