message_builder重构完成

pull/1201/head
UnCLAS-Prommer 2025-08-20 22:48:52 +08:00
parent 9253c0ad77
commit 924983e6de
No known key found for this signature in database
6 changed files with 123 additions and 107 deletions

View File

@ -1,8 +1,8 @@
import time # 导入 time 模块以获取当前时间
import time
import random
import re
from typing import List, Dict, Any, Tuple, Optional, Callable, Union
from typing import List, Dict, Any, Tuple, Optional, Callable
from rich.traceback import install
from src.config.config import global_config
@ -648,7 +648,7 @@ def build_readable_actions(actions: List[Dict[str, Any]]) -> str:
async def build_readable_messages_with_list(
messages: List[Dict[str, Any]],
messages: List[DatabaseMessages],
replace_bot_name: bool = True,
timestamp_mode: str = "relative",
truncate: bool = False,
@ -658,7 +658,7 @@ async def build_readable_messages_with_list(
允许通过参数控制格式化行为
"""
formatted_string, details_list, pic_id_mapping, _ = _build_readable_messages_internal(
messages, replace_bot_name, timestamp_mode, truncate
convert_DatabaseMessages_to_MessageAndActionModel(messages), replace_bot_name, timestamp_mode, truncate
)
if pic_mapping_info := build_pic_mapping_info(pic_id_mapping):
@ -675,7 +675,7 @@ def build_readable_messages_with_id(
truncate: bool = False,
show_actions: bool = False,
show_pic: bool = True,
) -> Tuple[str, List[Dict[str, Any]]]:
) -> Tuple[str, List[DatabaseMessages]]:
"""
将消息列表转换为可读的文本格式并返回原始(时间戳, 昵称, 内容)列表
允许通过参数控制格式化行为
@ -818,7 +818,6 @@ def build_readable_messages(
formatted_before, _, pic_id_mapping, pic_counter = _build_readable_messages_internal(
messages_before_mark,
replace_bot_name,
merge_messages,
timestamp_mode,
truncate,
pic_id_mapping,
@ -829,7 +828,6 @@ def build_readable_messages(
formatted_after, _, pic_id_mapping, _ = _build_readable_messages_internal(
messages_after_mark,
replace_bot_name,
merge_messages,
timestamp_mode,
False,
pic_id_mapping,
@ -998,3 +996,22 @@ async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]:
person_ids_set.add(person_id)
return list(person_ids_set) # 将集合转换为列表返回
def convert_DatabaseMessages_to_MessageAndActionModel(message: List[DatabaseMessages]) -> List[MessageAndActionModel]:
"""
DatabaseMessages 列表转换为 MessageAndActionModel 列表
"""
return [
MessageAndActionModel(
time=msg.time,
user_id=msg.user_info.user_id,
user_platform=msg.user_info.platform,
user_nickname=msg.user_info.user_nickname,
user_cardname=msg.user_info.user_cardname,
processed_plain_text=msg.processed_plain_text,
display_message=msg.display_message,
chat_info_platform=msg.chat_info.platform,
)
for msg in message
]

View File

@ -12,6 +12,7 @@ from typing import Optional, Tuple, Dict, List, Any
from src.common.logger import get_logger
from src.common.data_models.info_data_model import TargetPersonInfo
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.message_repository import find_messages, count_messages
from src.config.config import global_config, model_config
from src.chat.message_receive.message import MessageRecv
@ -152,10 +153,13 @@ def get_recent_group_speaker(chat_stream_id: str, sender, limit: int = 12) -> li
if (
(db_msg.user_info.platform, db_msg.user_info.user_id) != sender
and db_msg.user_info.user_id != global_config.bot.qq_account
and (db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname) not in who_chat_in_group
and (db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname)
not in who_chat_in_group
and len(who_chat_in_group) < 5
): # 排除重复排除消息发送者排除bot限制加载的关系数目
who_chat_in_group.append((db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname))
who_chat_in_group.append(
(db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname)
)
return who_chat_in_group
@ -641,9 +645,9 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]:
target_info = TargetPersonInfo(
platform=platform,
user_id=user_id,
user_nickname=user_info.user_nickname, # type: ignore
user_nickname=user_info.user_nickname, # type: ignore
person_id=None,
person_name=None
person_name=None,
)
# Try to fetch person info
@ -670,17 +674,17 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]:
return is_group_chat, chat_target_info
def assign_message_ids(messages: List[Any]) -> List[Dict[str, Any]]:
def assign_message_ids(messages: List[DatabaseMessages]) -> List[DatabaseMessages]:
"""
为消息列表中的每个消息分配唯一的简短随机ID
Args:
messages: 消息列表
Returns:
包含 {'id': str, 'message': any} 格式的字典列表
List[DatabaseMessages]: 分配了唯一ID的消息列表(写入message_id属性)
"""
result = []
result: List[DatabaseMessages] = list(messages) # 复制原始消息列表
used_ids = set()
len_i = len(messages)
if len_i > 100:
@ -689,95 +693,86 @@ def assign_message_ids(messages: List[Any]) -> List[Dict[str, Any]]:
else:
a = 1
b = 9
for i, message in enumerate(messages):
for i, _ in enumerate(result):
# 生成唯一的简短ID
while True:
# 使用索引+随机数生成简短ID
random_suffix = random.randint(a, b)
message_id = f"m{i+1}{random_suffix}"
message_id = f"m{i + 1}{random_suffix}"
if message_id not in used_ids:
used_ids.add(message_id)
break
result.append({
'id': message_id,
'message': message
})
result[i].message_id = message_id
return result
def assign_message_ids_flexible(
messages: list,
prefix: str = "msg",
id_length: int = 6,
use_timestamp: bool = False
) -> list:
"""
为消息列表中的每个消息分配唯一的简短随机ID增强版
Args:
messages: 消息列表
prefix: ID前缀默认为"msg"
id_length: ID的总长度不包括前缀默认为6
use_timestamp: 是否在ID中包含时间戳默认为False
Returns:
包含 {'id': str, 'message': any} 格式的字典列表
"""
result = []
used_ids = set()
for i, message in enumerate(messages):
# 生成唯一的ID
while True:
if use_timestamp:
# 使用时间戳的后几位 + 随机字符
timestamp_suffix = str(int(time.time() * 1000))[-3:]
remaining_length = id_length - 3
random_chars = ''.join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length))
message_id = f"{prefix}{timestamp_suffix}{random_chars}"
else:
# 使用索引 + 随机字符
index_str = str(i + 1)
remaining_length = max(1, id_length - len(index_str))
random_chars = ''.join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length))
message_id = f"{prefix}{index_str}{random_chars}"
if message_id not in used_ids:
used_ids.add(message_id)
break
result.append({
'id': message_id,
'message': message
})
return result
# def assign_message_ids_flexible(
# messages: list, prefix: str = "msg", id_length: int = 6, use_timestamp: bool = False
# ) -> list:
# """
# 为消息列表中的每个消息分配唯一的简短随机ID增强版
# Args:
# messages: 消息列表
# prefix: ID前缀默认为"msg"
# id_length: ID的总长度不包括前缀默认为6
# use_timestamp: 是否在ID中包含时间戳默认为False
# Returns:
# 包含 {'id': str, 'message': any} 格式的字典列表
# """
# result = []
# used_ids = set()
# for i, message in enumerate(messages):
# # 生成唯一的ID
# while True:
# if use_timestamp:
# # 使用时间戳的后几位 + 随机字符
# timestamp_suffix = str(int(time.time() * 1000))[-3:]
# remaining_length = id_length - 3
# random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length))
# message_id = f"{prefix}{timestamp_suffix}{random_chars}"
# else:
# # 使用索引 + 随机字符
# index_str = str(i + 1)
# remaining_length = max(1, id_length - len(index_str))
# random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length))
# message_id = f"{prefix}{index_str}{random_chars}"
# if message_id not in used_ids:
# used_ids.add(message_id)
# break
# result.append({"id": message_id, "message": message})
# return result
# 使用示例:
# messages = ["Hello", "World", "Test message"]
#
#
# # 基础版本
# result1 = assign_message_ids(messages)
# # 结果: [{'id': 'm1123', 'message': 'Hello'}, {'id': 'm2456', 'message': 'World'}, {'id': 'm3789', 'message': 'Test message'}]
#
#
# # 增强版本 - 自定义前缀和长度
# result2 = assign_message_ids_flexible(messages, prefix="chat", id_length=8)
# # 结果: [{'id': 'chat1abc2', 'message': 'Hello'}, {'id': 'chat2def3', 'message': 'World'}, {'id': 'chat3ghi4', 'message': 'Test message'}]
#
#
# # 增强版本 - 使用时间戳
# result3 = assign_message_ids_flexible(messages, prefix="ts", use_timestamp=True)
# # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}]
def parse_keywords_string(keywords_input) -> list[str]:
# sourcery skip: use-contextlib-suppress
"""
统一的关键词解析函数支持多种格式的关键词字符串解析
支持的格式
1. 字符串列表格式'["utils.py", "修改", "代码", "动作"]'
2. 斜杠分隔格式'utils.py/修改/代码/动作'
@ -785,25 +780,25 @@ def parse_keywords_string(keywords_input) -> list[str]:
4. 空格分隔格式'utils.py 修改 代码 动作'
5. 已经是列表的情况["utils.py", "修改", "代码", "动作"]
6. JSON格式字符串'{"keywords": ["utils.py", "修改", "代码", "动作"]}'
Args:
keywords_input: 关键词输入可以是字符串或列表
Returns:
list[str]: 解析后的关键词列表去除空白项
"""
if not keywords_input:
return []
# 如果已经是列表,直接处理
if isinstance(keywords_input, list):
return [str(k).strip() for k in keywords_input if str(k).strip()]
# 转换为字符串处理
keywords_str = str(keywords_input).strip()
if not keywords_str:
return []
try:
# 尝试作为JSON对象解析支持 {"keywords": [...]} 格式)
json_data = json.loads(keywords_str)
@ -816,7 +811,7 @@ def parse_keywords_string(keywords_input) -> list[str]:
return [str(k).strip() for k in json_data if str(k).strip()]
except (json.JSONDecodeError, ValueError):
pass
try:
# 尝试使用 ast.literal_eval 解析支持Python字面量格式
parsed = ast.literal_eval(keywords_str)
@ -824,15 +819,15 @@ def parse_keywords_string(keywords_input) -> list[str]:
return [str(k).strip() for k in parsed if str(k).strip()]
except (ValueError, SyntaxError):
pass
# 尝试不同的分隔符
separators = ['/', ',', ' ', '|', ';']
separators = ["/", ",", " ", "|", ";"]
for separator in separators:
if separator in keywords_str:
keywords_list = [k.strip() for k in keywords_str.split(separator) if k.strip()]
if len(keywords_list) > 1: # 确保分割有效
return keywords_list
# 如果没有分隔符,返回单个关键词
return [keywords_str] if keywords_str else []
return [keywords_str] if keywords_str else []

View File

@ -1,26 +1,27 @@
import copy
from typing import Dict, Any
class AbstractClassFlag:
pass
class BaseDataModel:
def deepcopy(self):
return copy.deepcopy(self)
def temporarily_transform_class_to_dict(obj: Any) -> Any:
"""
将对象或容器中的 AbstractClassFlag 子类类对象 AbstractClassFlag 实例
将对象或容器中的 BaseDataModel 子类类对象 BaseDataModel 实例
递归转换为普通 dict不修改原对象
- 对于类对象isinstance(value, type) issubclass(..., AbstractClassFlag)
- 对于类对象isinstance(value, type) issubclass(..., BaseDataModel)
读取类的 __dict__ 中非 dunder 项并递归转换
- 对于实例isinstance(value, AbstractClassFlag)读取 vars(instance) 并递归转换
- 对于实例isinstance(value, BaseDataModel)读取 vars(instance) 并递归转换
"""
def _transform(value: Any) -> Any:
# 值是类对象且为 AbstractClassFlag 的子类
if isinstance(value, type) and issubclass(value, AbstractClassFlag):
# 值是类对象且为 BaseDataModel 的子类
if isinstance(value, type) and issubclass(value, BaseDataModel):
return {k: _transform(v) for k, v in value.__dict__.items() if not k.startswith("__") and not callable(v)}
# 值是 AbstractClassFlag 的实例
if isinstance(value, AbstractClassFlag):
# 值是 BaseDataModel 的实例
if isinstance(value, BaseDataModel):
return {k: _transform(v) for k, v in vars(value).items()}
# 常见容器类型,递归处理

View File

@ -1,11 +1,11 @@
from typing import Optional, Dict, Any
from dataclasses import dataclass, field, fields, MISSING
from typing import Optional, Any
from dataclasses import dataclass, field
from . import AbstractClassFlag
from . import BaseDataModel
@dataclass
class DatabaseUserInfo(AbstractClassFlag):
class DatabaseUserInfo(BaseDataModel):
platform: str = field(default_factory=str)
user_id: str = field(default_factory=str)
user_nickname: str = field(default_factory=str)
@ -21,7 +21,7 @@ class DatabaseUserInfo(AbstractClassFlag):
@dataclass
class DatabaseGroupInfo(AbstractClassFlag):
class DatabaseGroupInfo(BaseDataModel):
group_id: str = field(default_factory=str)
group_name: str = field(default_factory=str)
group_platform: Optional[str] = None
@ -35,7 +35,7 @@ class DatabaseGroupInfo(AbstractClassFlag):
@dataclass
class DatabaseChatInfo(AbstractClassFlag):
class DatabaseChatInfo(BaseDataModel):
stream_id: str = field(default_factory=str)
platform: str = field(default_factory=str)
create_time: float = field(default_factory=float)
@ -55,7 +55,7 @@ class DatabaseChatInfo(AbstractClassFlag):
@dataclass(init=False)
class DatabaseMessages(AbstractClassFlag):
class DatabaseMessages(BaseDataModel):
def __init__(
self,
message_id: str = "",

View File

@ -1,8 +1,10 @@
from dataclasses import dataclass, field
from typing import Optional
from . import BaseDataModel
@dataclass
class TargetPersonInfo:
class TargetPersonInfo(BaseDataModel):
platform: str = field(default_factory=str)
user_id: str = field(default_factory=str)
user_nickname: str = field(default_factory=str)

View File

@ -1,9 +1,10 @@
from typing import Optional
from dataclasses import dataclass, field
from . import BaseDataModel
@dataclass
class MessageAndActionModel:
class MessageAndActionModel(BaseDataModel):
time: float = field(default_factory=float)
user_id: str = field(default_factory=str)
user_platform: str = field(default_factory=str)