From 79fcc6bd218a510f0947b35f2c53a5b8c149f75c Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sat, 23 Aug 2025 00:29:11 +0800 Subject: [PATCH] =?UTF-8?q?typing=E4=BF=AE=E5=A4=8D(partly)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../heart_flow/heartflow_message_processor.py | 2 +- src/chat/planner_actions/planner.py | 343 ++++++++++-------- src/chat/utils/chat_message_builder.py | 4 +- src/chat/utils/utils.py | 5 +- 4 files changed, 188 insertions(+), 166 deletions(-) diff --git a/src/chat/heart_flow/heartflow_message_processor.py b/src/chat/heart_flow/heartflow_message_processor.py index ab6f6613..bea9f900 100644 --- a/src/chat/heart_flow/heartflow_message_processor.py +++ b/src/chat/heart_flow/heartflow_message_processor.py @@ -22,7 +22,7 @@ if TYPE_CHECKING: logger = get_logger("chat") -async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool, list[str]]: +async def _calculate_interest(message: MessageRecv) -> Tuple[float, list[str]]: """计算消息的兴趣度 Args: diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 0c25e659..46751392 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -1,6 +1,7 @@ import json import time import traceback +import asyncio from typing import Dict, Optional, Tuple, List, Any from rich.traceback import install from datetime import datetime @@ -83,10 +84,9 @@ def init_prompt(): """, "action_prompt", ) - - + Prompt( - """ + """ {time_block} {name_block} 请你根据聊天内容,选择一个或多个action来参与聊天。如果没有合适的action,请选择no_action。 @@ -111,7 +111,7 @@ no_action:不选择任何动作 请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: """, - "sub_planner_prompt", + "sub_planner_prompt", ) @@ -152,32 +152,37 @@ class ActionPlanner: return item[1] return None - def get_latest_message(self, message_id_list: List[DatabaseMessages]) -> Optional[DatabaseMessages]: + def get_latest_message(self, message_id_list: List[Tuple[str, DatabaseMessages]]) -> Optional[DatabaseMessages]: """ 获取消息列表中的最新消息 Args: - message_id_list: 消息ID列表,格式为[{'id': str, 'message': dict}, ...] + message_id_list: 消息ID列表,格式为[(id, message), ...] Returns: 最新的消息字典,如果列表为空则返回None """ - return message_id_list[-1] if message_id_list else None - - def _parse_single_action(self, action_json: dict, message_id_list: List[Tuple[str, DatabaseMessages]], current_available_actions: List[Tuple[str, ActionInfo]]) -> List[ActionPlannerInfo]: + return message_id_list[-1][1] if message_id_list else None + + def _parse_single_action( + self, + action_json: dict, + message_id_list: List[Tuple[str, DatabaseMessages]], + current_available_actions: List[Tuple[str, ActionInfo]], + ) -> List[ActionPlannerInfo]: """解析单个action JSON并返回ActionPlannerInfo列表""" action_planner_infos = [] - + try: action = action_json.get("action", "no_action") reasoning = action_json.get("reason", "未提供原因") action_data = {} - + # 将所有其他属性添加到action_data for key, value in action_json.items(): if key not in ["action", "reasoning"]: action_data[key] = value - + # 非no_action动作需要target_message_id target_message = None if action != "no_action": @@ -190,41 +195,47 @@ class ActionPlanner: target_message = self.get_latest_message(message_id_list) else: logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id") - + # 验证action是否可用 available_action_names = [action_name for action_name, _ in current_available_actions] if action != "no_action" and action != "reply" and action not in available_action_names: logger.warning( f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'" ) - reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}" + reasoning = ( + f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}" + ) action = "no_action" - + # 创建ActionPlannerInfo对象 # 将列表转换为字典格式 available_actions_dict = dict(current_available_actions) - action_planner_infos.append(ActionPlannerInfo( - action_type=action, - reasoning=reasoning, - action_data=action_data, - action_message=target_message, - available_actions=available_actions_dict, - )) - + action_planner_infos.append( + ActionPlannerInfo( + action_type=action, + reasoning=reasoning, + action_data=action_data, + action_message=target_message, + available_actions=available_actions_dict, + ) + ) + except Exception as e: logger.error(f"{self.log_prefix}解析单个action时出错: {e}") # 将列表转换为字典格式 available_actions_dict = dict(current_available_actions) - action_planner_infos.append(ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析单个action时出错: {e}", - action_data={}, - action_message=None, - available_actions=available_actions_dict, - )) - + action_planner_infos.append( + ActionPlannerInfo( + action_type="no_action", + reasoning=f"解析单个action时出错: {e}", + action_data={}, + action_message=None, + available_actions=available_actions_dict, + ) + ) + return action_planner_infos - + async def sub_plan( self, action_list: List[Tuple[str, ActionInfo]], @@ -245,18 +256,15 @@ class ActionPlanner: # print(actions_before_now) # print(action_names_in_list) for action_record in actions_before_now: - if isinstance(action_record, dict) and 'action_name' in action_record: - action_type = action_record['action_name'] + if isinstance(action_record, dict) and "action_name" in action_record: + action_type = action_record["action_name"] if action_type in action_names_in_list: filtered_actions.append(action_record) - - actions_before_now_block = build_readable_actions( actions=filtered_actions, ) - - + if actions_before_now_block: actions_before_now_block = f"你刚刚选择并执行过的action是,请注意如果相同的内容已经被执行,请不要重复执行:\n{actions_before_now_block}" else: @@ -320,18 +328,20 @@ class ActionPlanner: logger.error(f"构建 Planner 提示词时出错: {e}") logger.error(traceback.format_exc()) # 返回一个默认的no_action而不是字符串 - return [ActionPlannerInfo( - action_type="no_action", - reasoning=f"构建 Planner Prompt 时出错: {e}", - action_data={}, - action_message=None, - available_actions=action_list, - )] + return [ + ActionPlannerInfo( + action_type="no_action", + reasoning=f"构建 Planner Prompt 时出错: {e}", + action_data={}, + action_message=None, + available_actions=action_list, + ) + ] # --- 调用 LLM (普通文本生成) --- llm_content = None action_planner_infos = [] # 存储多个ActionPlannerInfo对象 - + try: llm_content, (reasoning_content, _, _) = await self.planner_small_llm.generate_response_async(prompt=prompt) @@ -349,19 +359,21 @@ class ActionPlanner: except Exception as req_e: logger.error(f"{self.log_prefix}副规划器LLM 请求执行失败: {req_e}") # 返回一个默认的no_action - action_planner_infos.append(ActionPlannerInfo( - action_type="no_action", - reasoning=f"副规划器LLM 请求失败,模型出现问题: {req_e}", - action_data={}, - action_message=None, - available_actions=action_list, - )) + action_planner_infos.append( + ActionPlannerInfo( + action_type="no_action", + reasoning=f"副规划器LLM 请求失败,模型出现问题: {req_e}", + action_data={}, + action_message=None, + available_actions=action_list, + ) + ) return action_planner_infos if llm_content: try: parsed_json = json.loads(repair_json(llm_content)) - + # 处理不同的JSON格式 if isinstance(parsed_json, list): # 如果是列表,处理每个action @@ -369,74 +381,81 @@ class ActionPlanner: logger.info(f"{self.log_prefix}LLM返回了{len(parsed_json)}个action") for action_item in parsed_json: if isinstance(action_item, dict): - action_planner_infos.extend(self._parse_single_action( - action_item, message_id_list, action_list - )) + action_planner_infos.extend( + self._parse_single_action(action_item, message_id_list, action_list) + ) else: logger.warning(f"{self.log_prefix}列表中的action项不是字典类型: {type(action_item)}") else: logger.warning(f"{self.log_prefix}LLM返回了空列表") - action_planner_infos.append(ActionPlannerInfo( + action_planner_infos.append( + ActionPlannerInfo( + action_type="no_action", + reasoning="LLM返回了空列表,选择no_action", + action_data={}, + action_message=None, + available_actions=action_list, + ) + ) + elif isinstance(parsed_json, dict): + # 如果是单个字典,处理单个action + action_planner_infos.extend(self._parse_single_action(parsed_json, message_id_list, action_list)) + else: + logger.error(f"{self.log_prefix}解析后的JSON不是字典或列表类型: {type(parsed_json)}") + action_planner_infos.append( + ActionPlannerInfo( action_type="no_action", - reasoning="LLM返回了空列表,选择no_action", + reasoning=f"解析后的JSON类型错误: {type(parsed_json)}", action_data={}, action_message=None, available_actions=action_list, - )) - elif isinstance(parsed_json, dict): - # 如果是单个字典,处理单个action - action_planner_infos.extend(self._parse_single_action( - parsed_json, message_id_list, action_list - )) - else: - logger.error(f"{self.log_prefix}解析后的JSON不是字典或列表类型: {type(parsed_json)}") - action_planner_infos.append(ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析后的JSON类型错误: {type(parsed_json)}", - action_data={}, - action_message=None, - available_actions=action_list, - )) + ) + ) except Exception as json_e: logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") traceback.print_exc() - action_planner_infos.append(ActionPlannerInfo( + action_planner_infos.append( + ActionPlannerInfo( + action_type="no_action", + reasoning=f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'.", + action_data={}, + action_message=None, + available_actions=action_list, + ) + ) + else: + # 如果没有LLM内容,返回默认的no_action + action_planner_infos.append( + ActionPlannerInfo( action_type="no_action", - reasoning=f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'.", + reasoning="副规划器没有获得LLM响应", action_data={}, action_message=None, available_actions=action_list, - )) - else: - # 如果没有LLM内容,返回默认的no_action - action_planner_infos.append(ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器没有获得LLM响应", - action_data={}, - action_message=None, - available_actions=action_list, - )) - + ) + ) + # 如果没有解析到任何action,返回默认的no_action if not action_planner_infos: - action_planner_infos.append(ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器没有解析到任何有效action", - action_data={}, - action_message=None, - available_actions=action_list, - )) - + action_planner_infos.append( + ActionPlannerInfo( + action_type="no_action", + reasoning="副规划器没有解析到任何有效action", + action_data={}, + action_message=None, + available_actions=action_list, + ) + ) + logger.info(f"{self.log_prefix}副规划器返回了{len(action_planner_infos)}个action") return action_planner_infos - async def plan( self, + available_actions: Dict[str, ActionInfo], mode: ChatMode = ChatMode.FOCUS, loop_start_time: float = 0.0, - available_actions: Optional[Dict[str, ActionInfo]] = None, ) -> Tuple[List[ActionPlannerInfo], Optional[DatabaseMessages]]: """ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 @@ -449,7 +468,7 @@ class ActionPlanner: target_message: Optional[DatabaseMessages] = None # 初始化target_message变量 prompt: str = "" message_id_list: list = [] - + message_list_before_now = get_raw_msg_before_timestamp_with_chat( chat_id=self.chat_id, timestamp=time.time(), @@ -473,25 +492,24 @@ class ActionPlanner: actions_before_now_block = build_readable_actions( actions=actions_before_now, ) - - + message_list_before_now_short = message_list_before_now[:5] - + chat_content_block_short, message_id_list_short = build_readable_messages_with_id( messages=message_list_before_now_short, timestamp_mode="normal_no_YMD", truncate=False, show_actions=False, ) - + self.last_obs_time_mark = time.time() try: logger.info(f"{self.log_prefix}开始构建副Planner") - sub_planner_actions = {} - + sub_planner_actions: Dict[str, ActionInfo] = {} + for action_name, action_info in available_actions.items(): - if action_info.activation_type == ActionActivationType.LLM_JUDGE or action_info.activation_type == ActionActivationType.ALWAYS: + if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]: sub_planner_actions[action_name] = action_info elif action_info.activation_type == ActionActivationType.RANDOM: if random.random() < action_info.random_activation_probability: @@ -505,7 +523,7 @@ class ActionPlanner: pass else: logger.warning(f"{self.log_prefix}未知的激活类型: {action_info.activation_type},跳过处理") - + sub_planner_actions_num = len(sub_planner_actions) sub_planner_size = global_config.chat.planner_size if global_config.chat.planner_size > int(global_config.chat.planner_size): @@ -514,20 +532,20 @@ class ActionPlanner: sub_planner_num = int(sub_planner_actions_num / sub_planner_size) if sub_planner_actions_num % sub_planner_size != 0: sub_planner_num += 1 - + logger.info(f"{self.log_prefix}副规划器数量: {sub_planner_num}, 副规划器大小: {sub_planner_size}") - + # 将sub_planner_actions随机分配到sub_planner_num个List中 sub_planner_lists = [] if sub_planner_actions_num > 0: # 将actions转换为列表并随机打乱 action_items = list(sub_planner_actions.items()) random.shuffle(action_items) - + # 初始化所有子列表 for i in range(sub_planner_num): sub_planner_lists.append([]) - + # 分配actions到各个子列表 for i, (action_name, action_info) in enumerate(action_items): # 确保每个列表至少有一个action @@ -535,25 +553,23 @@ class ActionPlanner: sub_planner_lists[i].append((action_name, action_info)) else: # 随机选择一个列表添加action,但不超过最大大小限制 - available_lists = [j for j, lst in enumerate(sub_planner_lists) - if len(lst) < sub_planner_size] + available_lists = [j for j, lst in enumerate(sub_planner_lists) if len(lst) < sub_planner_size] if available_lists: target_list = random.choice(available_lists) sub_planner_lists[target_list].append((action_name, action_info)) - - logger.info(f"{self.log_prefix}成功将{len(sub_planner_actions)}个actions分配到{sub_planner_num}个子列表中") + + logger.info( + f"{self.log_prefix}成功将{len(sub_planner_actions)}个actions分配到{sub_planner_num}个子列表中" + ) for i, lst in enumerate(sub_planner_lists): - logger.debug(f"{self.log_prefix}子列表{i+1}: {len(lst)}个actions") + logger.debug(f"{self.log_prefix}子列表{i + 1}: {len(lst)}个actions") else: logger.info(f"{self.log_prefix}没有可用的actions需要分配") - - + # 先获取必要信息 is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() - + # 并行执行所有副规划器 - import asyncio - async def execute_sub_plan(action_list): return await self.sub_plan( action_list=action_list, @@ -564,18 +580,18 @@ class ActionPlanner: chat_target_info=chat_target_info, # current_available_actions=current_available_actions, ) - + # 创建所有任务 sub_plan_tasks = [execute_sub_plan(action_list) for action_list in sub_planner_lists] - + # 并行执行所有任务 sub_plan_results = await asyncio.gather(*sub_plan_tasks) - + # 收集所有结果 all_sub_planner_results = [] for sub_result in sub_plan_results: all_sub_planner_results.extend(sub_result) - + logger.info(f"{self.log_prefix}所有副规划器共返回了{len(all_sub_planner_results)}个action") # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- @@ -630,7 +646,7 @@ class ActionPlanner: action_planner_infos = self._parse_single_action( parsed_json, message_id_list, current_available_actions_list ) - + if action_planner_infos: # 获取第一个(也是唯一一个)action的信息 action_info = action_planner_infos[0] @@ -638,7 +654,7 @@ class ActionPlanner: reasoning = action_info.reasoning action_data.update(action_info.action_data) target_message = action_info.action_message - + # 处理target_message为None的情况(保持原有的重试逻辑) if target_message is None and action != "no_action": # 尝试获取最新消息作为target_message @@ -685,53 +701,61 @@ class ActionPlanner: else: # 如果所有都是no_action,返回第一个 return [action_list[0]] if action_list else [] - + # 根据is_parallel决定返回值 if is_parallel: # 如果为真,将主规划器的结果和副规划器的结果都返回 main_actions = [] - + # 添加主规划器的action(如果不是no_action) if action != "no_action": - main_actions.append(ActionPlannerInfo( - action_type=action, - reasoning=reasoning, - action_data=action_data, - action_message=target_message, - available_actions=available_actions, - )) - + main_actions.append( + ActionPlannerInfo( + action_type=action, + reasoning=reasoning, + action_data=action_data, + action_message=target_message, + available_actions=available_actions, + ) + ) + # 先合并主副规划器的结果 all_actions = main_actions + all_sub_planner_results - + # 然后统一过滤no_action actions = filter_no_actions(all_actions) - + # 如果所有结果都是no_action,返回一个no_action if not actions: - actions = [ActionPlannerInfo( - action_type="no_action", - reasoning="所有规划器都选择不执行动作", - action_data={}, - action_message=None, - available_actions=available_actions, - )] - - logger.info(f"{self.log_prefix}并行模式:返回主规划器{len(main_actions)}个action + 副规划器{len(all_sub_planner_results)}个action,过滤后总计{len(actions)}个action") + actions = [ + ActionPlannerInfo( + action_type="no_action", + reasoning="所有规划器都选择不执行动作", + action_data={}, + action_message=None, + available_actions=available_actions, + ) + ] + + logger.info( + f"{self.log_prefix}并行模式:返回主规划器{len(main_actions)}个action + 副规划器{len(all_sub_planner_results)}个action,过滤后总计{len(actions)}个action" + ) else: # 如果为假,只返回副规划器的结果 actions = filter_no_actions(all_sub_planner_results) - + # 如果所有结果都是no_action,返回一个no_action if not actions: - actions = [ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器都选择不执行动作", - action_data={}, - action_message=None, - available_actions=available_actions, - )] - + actions = [ + ActionPlannerInfo( + action_type="no_action", + reasoning="副规划器都选择不执行动作", + action_data={}, + action_message=None, + available_actions=available_actions, + ) + ] + logger.info(f"{self.log_prefix}非并行模式:返回副规划器的{len(actions)}个action(已过滤no_action)") return actions, target_message @@ -742,13 +766,12 @@ class ActionPlanner: chat_target_info: Optional[dict], # Now passed as argument current_available_actions: Dict[str, ActionInfo], mode: ChatMode = ChatMode.FOCUS, - actions_before_now_block :str = "", - chat_content_block :str = "", - message_id_list :List[Tuple[str, DatabaseMessages]] = None, + actions_before_now_block: str = "", + chat_content_block: str = "", + message_id_list: List[Tuple[str, DatabaseMessages]] = None, ) -> tuple[str, List[DatabaseMessages]]: # sourcery skip: use-join """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: - if actions_before_now_block: actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" else: diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index 0aca6bae..11ad2bb8 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -330,7 +330,7 @@ def _build_readable_messages_internal( pic_id_mapping: Optional[Dict[str, str]] = None, pic_counter: int = 1, show_pic: bool = True, - message_id_list: Optional[List[DatabaseMessages]] = None, + message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None, ) -> Tuple[str, List[Tuple[float, str, str]], Dict[str, str], int]: # sourcery skip: use-getitem-for-re-match-groups """ @@ -635,7 +635,7 @@ def build_readable_messages( truncate: bool = False, show_actions: bool = False, show_pic: bool = True, - message_id_list: Optional[List[DatabaseMessages]] = None, + message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None, ) -> str: # sourcery skip: extract-method """ 将消息列表转换为可读的文本格式。 diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 7634593c..5285201f 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -1,6 +1,5 @@ import random import re -import string import time import jieba import json @@ -8,7 +7,7 @@ import ast import numpy as np from collections import Counter -from typing import Optional, Tuple, Dict, List, Any +from typing import Optional, Tuple, Dict, List from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages @@ -675,7 +674,7 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]: return is_group_chat, chat_target_info -def assign_message_ids(messages: List[DatabaseMessages]) -> List[DatabaseMessages]: +def assign_message_ids(messages: List[DatabaseMessages]) -> List[Tuple[str, DatabaseMessages]]: """ 为消息列表中的每个消息分配唯一的简短随机ID