feat: 添加并发处理支持,优化消息段和转发消息的处理性能

pull/1477/head
墨梓柒 2026-01-03 14:03:59 +08:00
parent 352790f936
commit 5ecdf1c53d
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
1 changed files with 33 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import time import time
import asyncio
import urllib3 import urllib3
from abc import abstractmethod from abc import abstractmethod
@ -20,6 +21,9 @@ logger = get_logger("chat_message")
# 禁用SSL警告 # 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# VLM 处理并发限制(避免同时处理太多图片导致卡死)
_vlm_semaphore = asyncio.Semaphore(3)
# 这个类是消息数据类,用于存储和管理消息数据。 # 这个类是消息数据类,用于存储和管理消息数据。
# 它定义了消息的属性包括群组ID、用户ID、消息ID、原始消息内容、纯文本内容和时间戳。 # 它定义了消息的属性包括群组ID、用户ID、消息ID、原始消息内容、纯文本内容和时间戳。
# 它还定义了两个辅助属性keywords用于提取消息的关键词is_plain_text用于判断消息是否为纯文本。 # 它还定义了两个辅助属性keywords用于提取消息的关键词is_plain_text用于判断消息是否为纯文本。
@ -73,20 +77,35 @@ class Message(MessageBase):
str: 处理后的文本 str: 处理后的文本
""" """
if segment.type == "seglist": if segment.type == "seglist":
# 处理消息段列表 # 处理消息段列表 - 使用并行处理提升性能
tasks = [self._process_message_segments(seg) for seg in segment.data] # type: ignore
results = await asyncio.gather(*tasks, return_exceptions=True)
segments_text = [] segments_text = []
for seg in segment.data: for result in results:
processed = await self._process_message_segments(seg) # type: ignore if isinstance(result, Exception):
if processed: logger.error(f"处理消息段时出错: {result}")
segments_text.append(processed) continue
if result:
segments_text.append(result)
return " ".join(segments_text) return " ".join(segments_text)
elif segment.type == "forward": elif segment.type == "forward":
segments_text = [] # 处理转发消息 - 使用并行处理
for node_dict in segment.data: async def process_forward_node(node_dict):
message = MessageBase.from_dict(node_dict) # type: ignore message = MessageBase.from_dict(node_dict) # type: ignore
processed_text = await self._process_message_segments(message.message_segment) processed_text = await self._process_message_segments(message.message_segment)
if processed_text: if processed_text:
segments_text.append(f"{global_config.bot.nickname}: {processed_text}") return f"{global_config.bot.nickname}: {processed_text}"
return None
tasks = [process_forward_node(node_dict) for node_dict in segment.data]
results = await asyncio.gather(*tasks, return_exceptions=True)
segments_text = []
for result in results:
if isinstance(result, Exception):
logger.error(f"处理转发节点时出错: {result}")
continue
if result:
segments_text.append(result)
return "[合并消息]: " + "\n-- ".join(segments_text) return "[合并消息]: " + "\n-- ".join(segments_text)
else: else:
# 处理单个消息段 # 处理单个消息段
@ -173,7 +192,8 @@ class MessageRecv(Message):
self.is_picid = True self.is_picid = True
self.is_emoji = False self.is_emoji = False
image_manager = get_image_manager() image_manager = get_image_manager()
# print(f"segment.data: {segment.data}") # 使用 semaphore 限制 VLM 并发,避免同时处理太多图片
async with _vlm_semaphore:
_, processed_text = await image_manager.process_image(segment.data) _, processed_text = await image_manager.process_image(segment.data)
return processed_text return processed_text
return "[发了一张图片,网卡了加载不出来]" return "[发了一张图片,网卡了加载不出来]"
@ -183,6 +203,8 @@ class MessageRecv(Message):
self.is_picid = False self.is_picid = False
self.is_voice = False self.is_voice = False
if isinstance(segment.data, str): if isinstance(segment.data, str):
# 使用 semaphore 限制 VLM 并发
async with _vlm_semaphore:
return await get_image_manager().get_emoji_description(segment.data) return await get_image_manager().get_emoji_description(segment.data)
return "[发了一个表情包,网卡了加载不出来]" return "[发了一个表情包,网卡了加载不出来]"
elif segment.type == "voice": elif segment.type == "voice":