typing修复(partly)

pull/1217/head
UnCLAS-Prommer 2025-08-23 00:29:11 +08:00
parent add05cee55
commit 79fcc6bd21
No known key found for this signature in database
4 changed files with 188 additions and 166 deletions

View File

@ -22,7 +22,7 @@ if TYPE_CHECKING:
logger = get_logger("chat") logger = get_logger("chat")
async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool, list[str]]: async def _calculate_interest(message: MessageRecv) -> Tuple[float, list[str]]:
"""计算消息的兴趣度 """计算消息的兴趣度
Args: Args:

View File

@ -1,6 +1,7 @@
import json import json
import time import time
import traceback import traceback
import asyncio
from typing import Dict, Optional, Tuple, List, Any from typing import Dict, Optional, Tuple, List, Any
from rich.traceback import install from rich.traceback import install
from datetime import datetime from datetime import datetime
@ -84,9 +85,8 @@ def init_prompt():
"action_prompt", "action_prompt",
) )
Prompt( Prompt(
""" """
{time_block} {time_block}
{name_block} {name_block}
请你根据聊天内容选择一个或多个action来参与聊天如果没有合适的action请选择no_action 请你根据聊天内容选择一个或多个action来参与聊天如果没有合适的action请选择no_action
@ -111,7 +111,7 @@ no_action不选择任何动作
请根据动作示例以严格的 JSON 格式输出且仅包含 JSON 内容 请根据动作示例以严格的 JSON 格式输出且仅包含 JSON 内容
""", """,
"sub_planner_prompt", "sub_planner_prompt",
) )
@ -152,19 +152,24 @@ class ActionPlanner:
return item[1] return item[1]
return None return None
def get_latest_message(self, message_id_list: List[DatabaseMessages]) -> Optional[DatabaseMessages]: def get_latest_message(self, message_id_list: List[Tuple[str, DatabaseMessages]]) -> Optional[DatabaseMessages]:
""" """
获取消息列表中的最新消息 获取消息列表中的最新消息
Args: Args:
message_id_list: 消息ID列表格式为[{'id': str, 'message': dict}, ...] message_id_list: 消息ID列表格式为[(id, message), ...]
Returns: Returns:
最新的消息字典如果列表为空则返回None 最新的消息字典如果列表为空则返回None
""" """
return message_id_list[-1] if message_id_list else None return message_id_list[-1][1] if message_id_list else None
def _parse_single_action(self, action_json: dict, message_id_list: List[Tuple[str, DatabaseMessages]], current_available_actions: List[Tuple[str, ActionInfo]]) -> List[ActionPlannerInfo]: def _parse_single_action(
self,
action_json: dict,
message_id_list: List[Tuple[str, DatabaseMessages]],
current_available_actions: List[Tuple[str, ActionInfo]],
) -> List[ActionPlannerInfo]:
"""解析单个action JSON并返回ActionPlannerInfo列表""" """解析单个action JSON并返回ActionPlannerInfo列表"""
action_planner_infos = [] action_planner_infos = []
@ -197,31 +202,37 @@ class ActionPlanner:
logger.warning( logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'" f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'"
) )
reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}" reasoning = (
f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}"
)
action = "no_action" action = "no_action"
# 创建ActionPlannerInfo对象 # 创建ActionPlannerInfo对象
# 将列表转换为字典格式 # 将列表转换为字典格式
available_actions_dict = dict(current_available_actions) available_actions_dict = dict(current_available_actions)
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
action_type=action, ActionPlannerInfo(
reasoning=reasoning, action_type=action,
action_data=action_data, reasoning=reasoning,
action_message=target_message, action_data=action_data,
available_actions=available_actions_dict, action_message=target_message,
)) available_actions=available_actions_dict,
)
)
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix}解析单个action时出错: {e}") logger.error(f"{self.log_prefix}解析单个action时出错: {e}")
# 将列表转换为字典格式 # 将列表转换为字典格式
available_actions_dict = dict(current_available_actions) available_actions_dict = dict(current_available_actions)
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
action_type="no_action", ActionPlannerInfo(
reasoning=f"解析单个action时出错: {e}", action_type="no_action",
action_data={}, reasoning=f"解析单个action时出错: {e}",
action_message=None, action_data={},
available_actions=available_actions_dict, action_message=None,
)) available_actions=available_actions_dict,
)
)
return action_planner_infos return action_planner_infos
@ -245,18 +256,15 @@ class ActionPlanner:
# print(actions_before_now) # print(actions_before_now)
# print(action_names_in_list) # print(action_names_in_list)
for action_record in actions_before_now: for action_record in actions_before_now:
if isinstance(action_record, dict) and 'action_name' in action_record: if isinstance(action_record, dict) and "action_name" in action_record:
action_type = action_record['action_name'] action_type = action_record["action_name"]
if action_type in action_names_in_list: if action_type in action_names_in_list:
filtered_actions.append(action_record) filtered_actions.append(action_record)
actions_before_now_block = build_readable_actions( actions_before_now_block = build_readable_actions(
actions=filtered_actions, actions=filtered_actions,
) )
if actions_before_now_block: if actions_before_now_block:
actions_before_now_block = f"你刚刚选择并执行过的action是请注意如果相同的内容已经被执行请不要重复执行\n{actions_before_now_block}" actions_before_now_block = f"你刚刚选择并执行过的action是请注意如果相同的内容已经被执行请不要重复执行\n{actions_before_now_block}"
else: else:
@ -320,13 +328,15 @@ class ActionPlanner:
logger.error(f"构建 Planner 提示词时出错: {e}") logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
# 返回一个默认的no_action而不是字符串 # 返回一个默认的no_action而不是字符串
return [ActionPlannerInfo( return [
action_type="no_action", ActionPlannerInfo(
reasoning=f"构建 Planner Prompt 时出错: {e}", action_type="no_action",
action_data={}, reasoning=f"构建 Planner Prompt 时出错: {e}",
action_message=None, action_data={},
available_actions=action_list, action_message=None,
)] available_actions=action_list,
)
]
# --- 调用 LLM (普通文本生成) --- # --- 调用 LLM (普通文本生成) ---
llm_content = None llm_content = None
@ -349,13 +359,15 @@ class ActionPlanner:
except Exception as req_e: except Exception as req_e:
logger.error(f"{self.log_prefix}副规划器LLM 请求执行失败: {req_e}") logger.error(f"{self.log_prefix}副规划器LLM 请求执行失败: {req_e}")
# 返回一个默认的no_action # 返回一个默认的no_action
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
action_type="no_action", ActionPlannerInfo(
reasoning=f"副规划器LLM 请求失败,模型出现问题: {req_e}", action_type="no_action",
action_data={}, reasoning=f"副规划器LLM 请求失败,模型出现问题: {req_e}",
action_message=None, action_data={},
available_actions=action_list, action_message=None,
)) available_actions=action_list,
)
)
return action_planner_infos return action_planner_infos
if llm_content: if llm_content:
@ -369,74 +381,81 @@ class ActionPlanner:
logger.info(f"{self.log_prefix}LLM返回了{len(parsed_json)}个action") logger.info(f"{self.log_prefix}LLM返回了{len(parsed_json)}个action")
for action_item in parsed_json: for action_item in parsed_json:
if isinstance(action_item, dict): if isinstance(action_item, dict):
action_planner_infos.extend(self._parse_single_action( action_planner_infos.extend(
action_item, message_id_list, action_list self._parse_single_action(action_item, message_id_list, action_list)
)) )
else: else:
logger.warning(f"{self.log_prefix}列表中的action项不是字典类型: {type(action_item)}") logger.warning(f"{self.log_prefix}列表中的action项不是字典类型: {type(action_item)}")
else: else:
logger.warning(f"{self.log_prefix}LLM返回了空列表") logger.warning(f"{self.log_prefix}LLM返回了空列表")
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
ActionPlannerInfo(
action_type="no_action",
reasoning="LLM返回了空列表选择no_action",
action_data={},
action_message=None,
available_actions=action_list,
)
)
elif isinstance(parsed_json, dict):
# 如果是单个字典处理单个action
action_planner_infos.extend(self._parse_single_action(parsed_json, message_id_list, action_list))
else:
logger.error(f"{self.log_prefix}解析后的JSON不是字典或列表类型: {type(parsed_json)}")
action_planner_infos.append(
ActionPlannerInfo(
action_type="no_action", action_type="no_action",
reasoning="LLM返回了空列表选择no_action", reasoning=f"解析后的JSON类型错误: {type(parsed_json)}",
action_data={}, action_data={},
action_message=None, action_message=None,
available_actions=action_list, available_actions=action_list,
)) )
elif isinstance(parsed_json, dict): )
# 如果是单个字典处理单个action
action_planner_infos.extend(self._parse_single_action(
parsed_json, message_id_list, action_list
))
else:
logger.error(f"{self.log_prefix}解析后的JSON不是字典或列表类型: {type(parsed_json)}")
action_planner_infos.append(ActionPlannerInfo(
action_type="no_action",
reasoning=f"解析后的JSON类型错误: {type(parsed_json)}",
action_data={},
action_message=None,
available_actions=action_list,
))
except Exception as json_e: except Exception as json_e:
logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'")
traceback.print_exc() traceback.print_exc()
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
ActionPlannerInfo(
action_type="no_action",
reasoning=f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'.",
action_data={},
action_message=None,
available_actions=action_list,
)
)
else:
# 如果没有LLM内容返回默认的no_action
action_planner_infos.append(
ActionPlannerInfo(
action_type="no_action", action_type="no_action",
reasoning=f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'.", reasoning="副规划器没有获得LLM响应",
action_data={}, action_data={},
action_message=None, action_message=None,
available_actions=action_list, available_actions=action_list,
)) )
else: )
# 如果没有LLM内容返回默认的no_action
action_planner_infos.append(ActionPlannerInfo(
action_type="no_action",
reasoning="副规划器没有获得LLM响应",
action_data={},
action_message=None,
available_actions=action_list,
))
# 如果没有解析到任何action返回默认的no_action # 如果没有解析到任何action返回默认的no_action
if not action_planner_infos: if not action_planner_infos:
action_planner_infos.append(ActionPlannerInfo( action_planner_infos.append(
action_type="no_action", ActionPlannerInfo(
reasoning="副规划器没有解析到任何有效action", action_type="no_action",
action_data={}, reasoning="副规划器没有解析到任何有效action",
action_message=None, action_data={},
available_actions=action_list, action_message=None,
)) available_actions=action_list,
)
)
logger.info(f"{self.log_prefix}副规划器返回了{len(action_planner_infos)}个action") logger.info(f"{self.log_prefix}副规划器返回了{len(action_planner_infos)}个action")
return action_planner_infos return action_planner_infos
async def plan( async def plan(
self, self,
available_actions: Dict[str, ActionInfo],
mode: ChatMode = ChatMode.FOCUS, mode: ChatMode = ChatMode.FOCUS,
loop_start_time: float = 0.0, loop_start_time: float = 0.0,
available_actions: Optional[Dict[str, ActionInfo]] = None,
) -> Tuple[List[ActionPlannerInfo], Optional[DatabaseMessages]]: ) -> Tuple[List[ActionPlannerInfo], Optional[DatabaseMessages]]:
""" """
规划器 (Planner): 使用LLM根据上下文决定做出什么动作 规划器 (Planner): 使用LLM根据上下文决定做出什么动作
@ -474,7 +493,6 @@ class ActionPlanner:
actions=actions_before_now, actions=actions_before_now,
) )
message_list_before_now_short = message_list_before_now[:5] message_list_before_now_short = message_list_before_now[:5]
chat_content_block_short, message_id_list_short = build_readable_messages_with_id( chat_content_block_short, message_id_list_short = build_readable_messages_with_id(
@ -488,10 +506,10 @@ class ActionPlanner:
try: try:
logger.info(f"{self.log_prefix}开始构建副Planner") logger.info(f"{self.log_prefix}开始构建副Planner")
sub_planner_actions = {} sub_planner_actions: Dict[str, ActionInfo] = {}
for action_name, action_info in available_actions.items(): for action_name, action_info in available_actions.items():
if action_info.activation_type == ActionActivationType.LLM_JUDGE or action_info.activation_type == ActionActivationType.ALWAYS: if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]:
sub_planner_actions[action_name] = action_info sub_planner_actions[action_name] = action_info
elif action_info.activation_type == ActionActivationType.RANDOM: elif action_info.activation_type == ActionActivationType.RANDOM:
if random.random() < action_info.random_activation_probability: if random.random() < action_info.random_activation_probability:
@ -535,25 +553,23 @@ class ActionPlanner:
sub_planner_lists[i].append((action_name, action_info)) sub_planner_lists[i].append((action_name, action_info))
else: else:
# 随机选择一个列表添加action但不超过最大大小限制 # 随机选择一个列表添加action但不超过最大大小限制
available_lists = [j for j, lst in enumerate(sub_planner_lists) available_lists = [j for j, lst in enumerate(sub_planner_lists) if len(lst) < sub_planner_size]
if len(lst) < sub_planner_size]
if available_lists: if available_lists:
target_list = random.choice(available_lists) target_list = random.choice(available_lists)
sub_planner_lists[target_list].append((action_name, action_info)) sub_planner_lists[target_list].append((action_name, action_info))
logger.info(f"{self.log_prefix}成功将{len(sub_planner_actions)}个actions分配到{sub_planner_num}个子列表中") logger.info(
f"{self.log_prefix}成功将{len(sub_planner_actions)}个actions分配到{sub_planner_num}个子列表中"
)
for i, lst in enumerate(sub_planner_lists): for i, lst in enumerate(sub_planner_lists):
logger.debug(f"{self.log_prefix}子列表{i+1}: {len(lst)}个actions") logger.debug(f"{self.log_prefix}子列表{i + 1}: {len(lst)}个actions")
else: else:
logger.info(f"{self.log_prefix}没有可用的actions需要分配") logger.info(f"{self.log_prefix}没有可用的actions需要分配")
# 先获取必要信息 # 先获取必要信息
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
# 并行执行所有副规划器 # 并行执行所有副规划器
import asyncio
async def execute_sub_plan(action_list): async def execute_sub_plan(action_list):
return await self.sub_plan( return await self.sub_plan(
action_list=action_list, action_list=action_list,
@ -693,13 +709,15 @@ class ActionPlanner:
# 添加主规划器的action如果不是no_action # 添加主规划器的action如果不是no_action
if action != "no_action": if action != "no_action":
main_actions.append(ActionPlannerInfo( main_actions.append(
action_type=action, ActionPlannerInfo(
reasoning=reasoning, action_type=action,
action_data=action_data, reasoning=reasoning,
action_message=target_message, action_data=action_data,
available_actions=available_actions, action_message=target_message,
)) available_actions=available_actions,
)
)
# 先合并主副规划器的结果 # 先合并主副规划器的结果
all_actions = main_actions + all_sub_planner_results all_actions = main_actions + all_sub_planner_results
@ -709,28 +727,34 @@ class ActionPlanner:
# 如果所有结果都是no_action返回一个no_action # 如果所有结果都是no_action返回一个no_action
if not actions: if not actions:
actions = [ActionPlannerInfo( actions = [
action_type="no_action", ActionPlannerInfo(
reasoning="所有规划器都选择不执行动作", action_type="no_action",
action_data={}, reasoning="所有规划器都选择不执行动作",
action_message=None, action_data={},
available_actions=available_actions, action_message=None,
)] available_actions=available_actions,
)
]
logger.info(f"{self.log_prefix}并行模式:返回主规划器{len(main_actions)}个action + 副规划器{len(all_sub_planner_results)}个action过滤后总计{len(actions)}个action") logger.info(
f"{self.log_prefix}并行模式:返回主规划器{len(main_actions)}个action + 副规划器{len(all_sub_planner_results)}个action过滤后总计{len(actions)}个action"
)
else: else:
# 如果为假,只返回副规划器的结果 # 如果为假,只返回副规划器的结果
actions = filter_no_actions(all_sub_planner_results) actions = filter_no_actions(all_sub_planner_results)
# 如果所有结果都是no_action返回一个no_action # 如果所有结果都是no_action返回一个no_action
if not actions: if not actions:
actions = [ActionPlannerInfo( actions = [
action_type="no_action", ActionPlannerInfo(
reasoning="副规划器都选择不执行动作", action_type="no_action",
action_data={}, reasoning="副规划器都选择不执行动作",
action_message=None, action_data={},
available_actions=available_actions, action_message=None,
)] available_actions=available_actions,
)
]
logger.info(f"{self.log_prefix}非并行模式:返回副规划器的{len(actions)}个action已过滤no_action") logger.info(f"{self.log_prefix}非并行模式:返回副规划器的{len(actions)}个action已过滤no_action")
@ -742,13 +766,12 @@ class ActionPlanner:
chat_target_info: Optional[dict], # Now passed as argument chat_target_info: Optional[dict], # Now passed as argument
current_available_actions: Dict[str, ActionInfo], current_available_actions: Dict[str, ActionInfo],
mode: ChatMode = ChatMode.FOCUS, mode: ChatMode = ChatMode.FOCUS,
actions_before_now_block :str = "", actions_before_now_block: str = "",
chat_content_block :str = "", chat_content_block: str = "",
message_id_list :List[Tuple[str, DatabaseMessages]] = None, message_id_list: List[Tuple[str, DatabaseMessages]] = None,
) -> tuple[str, List[DatabaseMessages]]: # sourcery skip: use-join ) -> tuple[str, List[DatabaseMessages]]: # sourcery skip: use-join
"""构建 Planner LLM 的提示词 (获取模板并填充数据)""" """构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try: try:
if actions_before_now_block: if actions_before_now_block:
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}" actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
else: else:

View File

@ -330,7 +330,7 @@ def _build_readable_messages_internal(
pic_id_mapping: Optional[Dict[str, str]] = None, pic_id_mapping: Optional[Dict[str, str]] = None,
pic_counter: int = 1, pic_counter: int = 1,
show_pic: bool = True, show_pic: bool = True,
message_id_list: Optional[List[DatabaseMessages]] = None, message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None,
) -> Tuple[str, List[Tuple[float, str, str]], Dict[str, str], int]: ) -> Tuple[str, List[Tuple[float, str, str]], Dict[str, str], int]:
# sourcery skip: use-getitem-for-re-match-groups # sourcery skip: use-getitem-for-re-match-groups
""" """
@ -635,7 +635,7 @@ def build_readable_messages(
truncate: bool = False, truncate: bool = False,
show_actions: bool = False, show_actions: bool = False,
show_pic: bool = True, show_pic: bool = True,
message_id_list: Optional[List[DatabaseMessages]] = None, message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None,
) -> str: # sourcery skip: extract-method ) -> str: # sourcery skip: extract-method
""" """
将消息列表转换为可读的文本格式 将消息列表转换为可读的文本格式

View File

@ -1,6 +1,5 @@
import random import random
import re import re
import string
import time import time
import jieba import jieba
import json import json
@ -8,7 +7,7 @@ import ast
import numpy as np import numpy as np
from collections import Counter from collections import Counter
from typing import Optional, Tuple, Dict, List, Any from typing import Optional, Tuple, Dict, List
from src.common.logger import get_logger from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.database_data_model import DatabaseMessages
@ -675,7 +674,7 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]:
return is_group_chat, chat_target_info return is_group_chat, chat_target_info
def assign_message_ids(messages: List[DatabaseMessages]) -> List[DatabaseMessages]: def assign_message_ids(messages: List[DatabaseMessages]) -> List[Tuple[str, DatabaseMessages]]:
""" """
为消息列表中的每个消息分配唯一的简短随机ID 为消息列表中的每个消息分配唯一的简短随机ID