mirror of https://github.com/Mai-with-u/MaiBot.git
242 lines
9.3 KiB
Python
242 lines
9.3 KiB
Python
import asyncio
|
||
import time
|
||
from typing import Dict, List, Optional, Union
|
||
|
||
from src.common.logger import get_module_logger
|
||
from ..message.api import global_api
|
||
from ..chat.message import MessageSending, MessageThinking, MessageSet
|
||
from ..storage.storage import MessageStorage
|
||
from ...config.config import global_config
|
||
from ..chat.utils import truncate_message, calculate_typing_time, count_messages_between
|
||
|
||
from src.common.logger import LogConfig, SENDER_STYLE_CONFIG
|
||
|
||
# 定义日志配置
|
||
sender_config = LogConfig(
|
||
# 使用消息发送专用样式
|
||
console_format=SENDER_STYLE_CONFIG["console_format"],
|
||
file_format=SENDER_STYLE_CONFIG["file_format"],
|
||
)
|
||
|
||
logger = get_module_logger("msg_sender", config=sender_config)
|
||
|
||
|
||
class MessageSender:
|
||
"""发送器"""
|
||
|
||
_instance = None
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
if cls._instance is None:
|
||
cls._instance = super(MessageSender, cls).__new__(cls, *args, **kwargs)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
# 确保 __init__ 只被调用一次
|
||
if not hasattr(self, "_initialized"):
|
||
self.message_interval = (0.5, 1) # 消息间隔时间范围(秒)
|
||
self.last_send_time = 0
|
||
self._current_bot = None
|
||
self._initialized = True
|
||
|
||
def set_bot(self, bot):
|
||
"""设置当前bot实例"""
|
||
pass
|
||
|
||
async def send_via_ws(self, message: MessageSending) -> None:
|
||
try:
|
||
await global_api.send_message(message)
|
||
except Exception as e:
|
||
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e
|
||
|
||
async def send_message(
|
||
self,
|
||
message: MessageSending,
|
||
) -> None:
|
||
"""发送消息"""
|
||
|
||
message_json = message.to_dict()
|
||
|
||
message_preview = truncate_message(message.processed_plain_text)
|
||
try:
|
||
end_point = global_config.api_urls.get(message.message_info.platform, None)
|
||
if end_point:
|
||
try:
|
||
await global_api.send_message_rest(end_point, message_json)
|
||
except Exception as e:
|
||
logger.error(f"REST方式发送失败,出现错误: {str(e)}")
|
||
logger.info("尝试使用ws发送")
|
||
await self.send_via_ws(message)
|
||
else:
|
||
await self.send_via_ws(message)
|
||
logger.success(f"发送消息 {message_preview} 成功")
|
||
except Exception as e:
|
||
logger.error(f"发送消息 {message_preview} 失败: {str(e)}")
|
||
|
||
|
||
class MessageContainer:
|
||
"""单个聊天流的发送/思考消息容器"""
|
||
|
||
def __init__(self, chat_id: str, max_size: int = 100):
|
||
self.chat_id = chat_id
|
||
self.max_size = max_size
|
||
self.messages = []
|
||
self.last_send_time = 0
|
||
|
||
def count_thinking_messages(self) -> int:
|
||
"""计算当前容器中思考消息的数量"""
|
||
return sum(1 for msg in self.messages if isinstance(msg, MessageThinking))
|
||
|
||
def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]:
|
||
"""获取thinking_start_time最早的消息对象"""
|
||
if not self.messages:
|
||
return None
|
||
earliest_time = float("inf")
|
||
earliest_message = None
|
||
for msg in self.messages:
|
||
msg_time = msg.thinking_start_time
|
||
if msg_time < earliest_time:
|
||
earliest_time = msg_time
|
||
earliest_message = msg
|
||
return earliest_message
|
||
|
||
def add_message(self, message: Union[MessageThinking, MessageSending]) -> None:
|
||
"""添加消息到队列"""
|
||
if isinstance(message, MessageSet):
|
||
for single_message in message.messages:
|
||
self.messages.append(single_message)
|
||
else:
|
||
self.messages.append(message)
|
||
|
||
def remove_message(self, message: Union[MessageThinking, MessageSending]) -> bool:
|
||
"""移除消息,如果消息存在则返回True,否则返回False"""
|
||
try:
|
||
if message in self.messages:
|
||
self.messages.remove(message)
|
||
return True
|
||
return False
|
||
except Exception:
|
||
logger.exception("移除消息时发生错误")
|
||
return False
|
||
|
||
def has_messages(self) -> bool:
|
||
"""检查是否有待发送的消息"""
|
||
return bool(self.messages)
|
||
|
||
def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]:
|
||
"""获取所有消息"""
|
||
return list(self.messages)
|
||
|
||
|
||
class MessageManager:
|
||
"""管理所有聊天流的消息容器"""
|
||
|
||
_instance = None
|
||
_lock = asyncio.Lock()
|
||
|
||
def __init__(self):
|
||
if MessageManager._instance is not None:
|
||
raise Exception("This class is a singleton!")
|
||
else:
|
||
self.containers: Dict[str, MessageContainer] = {}
|
||
self._container_lock = asyncio.Lock()
|
||
self.running = True
|
||
MessageManager._instance = self
|
||
|
||
async def start(self):
|
||
"""Starts the background processor task."""
|
||
asyncio.create_task(self.start_processor())
|
||
logger.info("MessageManager processor task started.")
|
||
|
||
def get_container(self, chat_id: str) -> MessageContainer:
|
||
"""获取或创建聊天流的消息容器"""
|
||
if chat_id not in self.containers:
|
||
self.containers[chat_id] = MessageContainer(chat_id)
|
||
return self.containers[chat_id]
|
||
|
||
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
|
||
chat_stream = message.chat_stream
|
||
if not chat_stream:
|
||
raise ValueError("无法找到对应的聊天流")
|
||
container = self.get_container(chat_stream.stream_id)
|
||
container.add_message(message)
|
||
|
||
def check_if_sending_message_exist(self, chat_id, thinking_id):
|
||
"""检查指定聊天流的容器中是否存在具有特定 thinking_id 的 MessageSending 消息"""
|
||
container = self.get_container(chat_id)
|
||
if container.has_messages():
|
||
for message in container.get_all_messages():
|
||
# 首先确保是 MessageSending 类型
|
||
if isinstance(message, MessageSending):
|
||
# 然后再访问 message_info.message_id
|
||
# 检查 message_id 是否匹配 thinking_id 或以 "me" 开头
|
||
if message.message_info.message_id == thinking_id or message.message_info.message_id[:2] == "me":
|
||
# print(f"检查到存在相同thinking_id的消息: {message.message_info.message_id}???{thinking_id}")
|
||
|
||
return True
|
||
return False
|
||
|
||
async def process_chat_messages(self, chat_id: str):
|
||
"""处理聊天流消息"""
|
||
container = self.get_container(chat_id)
|
||
if container.has_messages():
|
||
# print(f"处理有message的容器chat_id: {chat_id}")
|
||
message_earliest = container.get_earliest_message()
|
||
|
||
if isinstance(message_earliest, MessageThinking):
|
||
"""取得了思考消息"""
|
||
message_earliest.update_thinking_time()
|
||
thinking_time = message_earliest.thinking_time
|
||
# print(thinking_time)
|
||
print(
|
||
f"消息正在思考中,已思考{int(thinking_time)}秒\r",
|
||
end="",
|
||
flush=True,
|
||
)
|
||
|
||
# 检查是否超时
|
||
if thinking_time > global_config.thinking_timeout:
|
||
logger.warning(f"消息思考超时({thinking_time}秒),移除该消息")
|
||
container.remove_message(message_earliest)
|
||
|
||
else:
|
||
"""取得了发送消息"""
|
||
thinking_time = message_earliest.update_thinking_time()
|
||
thinking_start_time = message_earliest.thinking_start_time
|
||
now_time = time.time()
|
||
thinking_messages_count, thinking_messages_length = count_messages_between(
|
||
start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id
|
||
)
|
||
|
||
await message_earliest.process()
|
||
|
||
# 获取 MessageSender 的单例实例并发送消息
|
||
typing_time = calculate_typing_time(
|
||
input_string=message_earliest.processed_plain_text,
|
||
thinking_start_time=message_earliest.thinking_start_time,
|
||
is_emoji=message_earliest.is_emoji,
|
||
)
|
||
await asyncio.sleep(typing_time)
|
||
|
||
await MessageSender().send_message(message_earliest)
|
||
await self.storage.store_message(message_earliest, message_earliest.chat_stream)
|
||
|
||
container.remove_message(message_earliest)
|
||
|
||
async def start_processor(self):
|
||
"""启动消息处理器"""
|
||
while self.running:
|
||
await asyncio.sleep(1)
|
||
tasks = []
|
||
for chat_id in list(self.containers.keys()): # 使用 list 复制 key,防止在迭代时修改字典
|
||
tasks.append(self.process_chat_messages(chat_id))
|
||
|
||
if tasks: # 仅在有任务时执行 gather
|
||
await asyncio.gather(*tasks)
|
||
|
||
|
||
# # 创建全局消息管理器实例 # 已改为单例模式
|
||
# message_manager = MessageManager()
|
||
# # 创建全局发送器实例 # 已改为单例模式
|
||
# message_sender = MessageSender()
|