MaiBot-Napcat-Adapter/src/recv_handler.py

781 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from .logger import logger
from .config import global_config
from .qq_emoji_list import qq_face
import time
import asyncio
import json
import websockets as Server
from typing import List, Tuple, Optional
import uuid
from . import MetaEventType, RealMessageType, MessageType, NoticeType
from maim_message import (
UserInfo,
GroupInfo,
Seg,
BaseMessageInfo,
MessageBase,
TemplateInfo,
FormatInfo,
Router,
)
from .utils import (
get_group_info,
get_member_info,
get_image_base64,
get_self_info,
get_stranger_info,
get_message_detail,
)
from .message_queue import get_response
class RecvHandler:
maibot_router: Router = None
def __init__(self):
self.server_connection: Server.ServerConnection = None
self.interval = global_config.napcat_heartbeat_interval
async def handle_meta_event(self, message: dict) -> None:
event_type = message.get("meta_event_type")
if event_type == MetaEventType.lifecycle:
sub_type = message.get("sub_type")
if sub_type == MetaEventType.Lifecycle.connect:
self_id = message.get("self_id")
self.last_heart_beat = time.time()
logger.info(f"Bot {self_id} 连接成功")
asyncio.create_task(self.check_heartbeat(self_id))
elif event_type == MetaEventType.heartbeat:
if message["status"].get("online") and message["status"].get("good"):
self.last_heart_beat = time.time()
self.interval = message.get("interval") / 1000
else:
self_id = message.get("self_id")
logger.warning(f"Bot {self_id} Napcat 端异常!")
async def check_heartbeat(self, id: int) -> None:
while True:
now_time = time.time()
if now_time - self.last_heart_beat > self.interval + 3:
logger.warning(f"Bot {id} 连接已断开")
break
else:
logger.debug("心跳正常")
await asyncio.sleep(self.interval)
def check_allow_to_chat(self, user_id: int, group_id: Optional[int]) -> bool:
# sourcery skip: hoist-statement-from-if, merge-else-if-into-elif
"""
检查是否允许聊天
Parameters:
user_id: int: 用户ID
group_id: int: 群ID
Returns:
bool: 是否允许聊天
"""
if group_id:
if global_config.list_type == "whitelist" and group_id not in global_config.group_list:
logger.warning("群聊不在白名单中,消息被丢弃")
return False
elif global_config.list_type == "blacklist" and group_id in global_config.group_list:
logger.warning("群聊在黑名单中,消息被丢弃")
return False
else:
if global_config.list_type == "whitelist" and user_id not in global_config.user_list:
logger.warning("用户不在白名单中,消息被丢弃")
return False
elif global_config.list_type == "blacklist" and user_id in global_config.user_list:
logger.warning("用户在黑名单中,消息被丢弃")
return False
if user_id in global_config.ban_user_id:
logger.warning("用户在黑名单中,消息被丢弃")
return False
return True
async def handle_raw_message(self, raw_message: dict) -> None:
# sourcery skip: low-code-quality, remove-unreachable-code
"""
从Napcat接受的原始消息处理
Parameters:
raw_message: dict: 原始消息
"""
message_type: str = raw_message.get("message_type")
message_id: int = raw_message.get("message_id")
# message_time: int = raw_message.get("time")
message_time: float = time.time() # 应可乐要求现在是float了
template_info: TemplateInfo = None # 模板信息,暂时为空,等待启用
format_info: FormatInfo = FormatInfo(
content_format=["text", "image", "emoji"],
accept_format=["text", "image", "emoji", "reply"],
) # 格式化信息
if message_type == MessageType.private:
sub_type = raw_message.get("sub_type")
if sub_type == MessageType.Private.friend:
sender_info: dict = raw_message.get("sender")
if not self.check_allow_to_chat(sender_info.get("user_id"), None):
return None
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=sender_info.get("nickname"),
user_cardname=sender_info.get("card"),
)
# 不存在群信息
group_info: GroupInfo = None
elif sub_type == MessageType.Private.group:
"""
本部分暂时不做支持,先放着
"""
logger.warning("群临时消息类型不支持")
return None
sender_info: dict = raw_message.get("sender")
# 由于临时会话中Napcat默认不发送成员昵称所以需要单独获取
fetched_member_info: dict = await get_member_info(
self.server_connection,
raw_message.get("group_id"),
sender_info.get("user_id"),
)
nickname = fetched_member_info.get("nickname") if fetched_member_info else None
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=nickname,
user_cardname=None,
)
# -------------------这里需要群信息吗?-------------------
# 获取群聊相关信息在此单独处理group_name因为默认发送的消息中没有
fetched_group_info: dict = await get_group_info(self.server_connection, raw_message.get("group_id"))
group_name = ""
if fetched_group_info.get("group_name"):
group_name = fetched_group_info.get("group_name")
group_info: GroupInfo = GroupInfo(
platform=global_config.platform,
group_id=raw_message.get("group_id"),
group_name=group_name,
)
else:
logger.warning("私聊消息类型不支持")
return None
elif message_type == MessageType.group:
sub_type = raw_message.get("sub_type")
if sub_type == MessageType.Group.normal:
sender_info: dict = raw_message.get("sender")
if not self.check_allow_to_chat(sender_info.get("user_id"), raw_message.get("group_id")):
return None
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=sender_info.get("nickname"),
user_cardname=sender_info.get("card"),
)
# 获取群聊相关信息在此单独处理group_name因为默认发送的消息中没有
fetched_group_info = await get_group_info(self.server_connection, raw_message.get("group_id"))
group_name: str = None
if fetched_group_info:
group_name = fetched_group_info.get("group_name")
group_info: GroupInfo = GroupInfo(
platform=global_config.platform,
group_id=raw_message.get("group_id"),
group_name=group_name,
)
else:
logger.warning("群聊消息类型不支持")
return None
additional_config: dict = {}
if global_config.use_tts:
additional_config["allow_tts"] = True
# 消息信息
message_info: BaseMessageInfo = BaseMessageInfo(
platform=global_config.platform,
message_id=message_id,
time=message_time,
user_info=user_info,
group_info=group_info,
template_info=template_info,
format_info=format_info,
additional_config=additional_config,
)
# 处理实际信息
if not raw_message.get("message"):
logger.warning("消息内容为空")
return None
# 获取Seg列表
seg_message: List[Seg] = await self.handle_real_message(raw_message)
if not seg_message:
logger.warning("消息内容为空")
return None
submit_seg: Seg = Seg(
type="seglist",
data=seg_message,
)
# MessageBase创建
message_base: MessageBase = MessageBase(
message_info=message_info,
message_segment=submit_seg,
raw_message=raw_message.get("raw_message"),
)
logger.info("发送到Maibot处理信息")
await self.message_process(message_base)
async def handle_real_message(self, raw_message: dict, in_reply: bool = False) -> List[Seg] | None:
# sourcery skip: low-code-quality
"""
处理实际消息
Parameters:
real_message: dict: 实际消息
Returns:
seg_message: list[Seg]: 处理后的消息段列表
"""
real_message: list = raw_message.get("message")
if not real_message:
return None
seg_message: List[Seg] = []
for sub_message in real_message:
sub_message: dict
sub_message_type = sub_message.get("type")
match sub_message_type:
case RealMessageType.text:
ret_seg = await self.handle_text_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("text处理失败")
case RealMessageType.face:
ret_seg = await self.handle_face_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("face处理失败或不支持")
case RealMessageType.reply:
if not in_reply:
ret_seg = await self.handle_reply_message(sub_message)
if ret_seg:
seg_message += ret_seg
else:
logger.warning("reply处理失败")
case RealMessageType.image:
ret_seg = await self.handle_image_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("image处理失败")
case RealMessageType.record:
logger.warning("不支持语音解析")
case RealMessageType.video:
logger.warning("不支持视频解析")
case RealMessageType.at:
ret_seg = await self.handle_at_message(
sub_message,
raw_message.get("self_id"),
raw_message.get("group_id"),
)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("at处理失败")
case RealMessageType.rps:
logger.warning("暂时不支持猜拳魔法表情解析")
case RealMessageType.dice:
logger.warning("暂时不支持骰子表情解析")
case RealMessageType.shake:
# 预计等价于戳一戳
logger.warning("暂时不支持窗口抖动解析")
case RealMessageType.share:
logger.warning("暂时不支持链接解析")
case RealMessageType.forward:
forward_message_data = sub_message.get("data")
if not forward_message_data:
logger.warning("转发消息内容为空")
return None
forward_message_id = forward_message_data.get("id")
request_uuid = str(uuid.uuid4())
payload = json.dumps(
{
"action": "get_forward_msg",
"params": {"message_id": forward_message_id},
"echo": request_uuid,
}
)
try:
await self.server_connection.send(payload)
response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error("获取转发消息超时")
return None
except Exception as e:
logger.error(f"获取转发消息失败: {str(e)}")
return None
logger.debug(
f"转发消息原始格式:{json.dumps(response)[:80]}..."
if len(json.dumps(response)) > 80
else json.dumps(response)
)
response_data: dict = response.get("data")
if not response_data:
logger.warning("转发消息内容为空或获取失败")
return None
messages = response_data.get("messages")
if not messages:
logger.warning("转发消息内容为空或获取失败")
return None
ret_seg = await self.handle_forward_message(messages)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("转发消息处理失败")
case RealMessageType.node:
logger.warning("不支持转发消息节点解析")
case _:
logger.warning(f"未知消息类型:{sub_message_type}")
return seg_message
async def handle_text_message(self, raw_message: dict) -> Seg:
"""
处理纯文本信息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
plain_text: str = message_data.get("text")
return Seg(type=RealMessageType.text, data=plain_text)
async def handle_face_message(self, raw_message: dict) -> Seg | None:
"""
处理表情消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
face_raw_id: str = str(message_data.get("id"))
if face_raw_id in qq_face:
face_content: str = qq_face.get(face_raw_id)
return Seg(type="text", data=face_content)
else:
logger.warning(f"不支持的表情:{face_raw_id}")
return None
async def handle_image_message(self, raw_message: dict) -> Seg | None:
"""
处理图片消息与表情包消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
image_sub_type = message_data.get("sub_type")
try:
image_base64 = await get_image_base64(message_data.get("url"))
except Exception as e:
logger.error(f"图片消息处理失败: {str(e)}")
return None
if image_sub_type == 0:
"""这部分认为是图片"""
return Seg(type="image", data=image_base64)
elif image_sub_type == 1:
"""这部分认为是表情包"""
return Seg(type="emoji", data=image_base64)
else:
logger.warning(f"不支持的图片类型:{image_sub_type}")
return None
async def handle_at_message(self, raw_message: dict, self_id: int, group_id: int) -> Seg | None:
# sourcery skip: use-named-expression
"""
处理at消息
Parameters:
raw_message: dict: 原始消息
self_id: int: 机器人QQ号
group_id: int: 群号
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
if message_data:
qq_id = message_data.get("qq")
if str(self_id) == str(qq_id):
self_info: dict = await get_self_info(self.server_connection)
if self_info:
return Seg(
type=RealMessageType.text, data=f"@<{self_info.get('nickname')}:{self_info.get('user_id')}>"
)
else:
return None
else:
member_info: dict = await get_member_info(self.server_connection, group_id=group_id, user_id=qq_id)
if member_info:
return Seg(
type=RealMessageType.text, data=f"@<{member_info.get('nickname')}:{member_info.get('user_id')}>"
)
else:
return None
async def handle_reply_message(self, raw_message: dict) -> Seg | None:
# sourcery skip: move-assign-in-block, use-named-expression
"""
处理回复消息
"""
raw_message_data: dict = raw_message.get("data")
message_id: int = None
if raw_message_data:
message_id = raw_message_data.get("id")
else:
return None
message_detail: dict = await get_message_detail(self.server_connection, message_id)
if not message_detail:
logger.warning("获取被引用的消息详情失败")
return None
reply_message = await self.handle_real_message(message_detail, in_reply=True)
if reply_message is None:
reply_message = "(获取发言内容失败)"
sender_info: dict = message_detail.get("sender")
sender_nickname: str = sender_info.get("nickname")
sender_id: str = sender_info.get("user_id")
seg_message: List[Seg] = []
if not sender_nickname:
logger.warning("无法获取被引用的人的昵称,返回默认值")
seg_message.append(Seg(type="text", data="[回复 未知用户:"))
else:
seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}:{sender_id}>"))
seg_message += reply_message
seg_message.append(Seg(type="text", data="],说:"))
return seg_message
async def handle_notice(self, raw_message: dict) -> None:
notice_type = raw_message.get("notice_type")
# message_time: int = raw_message.get("time")
message_time: float = time.time() # 应可乐要求现在是float了
group_id = raw_message.get("group_id")
user_id = raw_message.get("user_id")
handled_message: Seg = None
match notice_type:
case NoticeType.friend_recall:
logger.info("好友撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}")
logger.warning("暂时不支持撤回消息处理")
case NoticeType.group_recall:
logger.info("群内用户撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}")
logger.warning("暂时不支持撤回消息处理")
case NoticeType.notify:
sub_type = raw_message.get("sub_type")
match sub_type:
case NoticeType.Notify.poke:
handled_message: Seg = await self.handle_poke_notify(raw_message)
case _:
logger.warning("不支持的notify类型")
case _:
logger.warning("不支持的notice类型")
return None
if not handled_message:
logger.warning("notice处理失败或不支持")
return None
source_name: str = None
source_cardname: str = None
if group_id:
member_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if member_info:
source_name = member_info.get("nickname")
source_cardname = member_info.get("card")
else:
logger.warning("无法获取戳一戳消息发送者的昵称,消息可能会无效")
source_name = "QQ用户"
else:
stranger_info = await get_stranger_info(self.server_connection, user_id)
if stranger_info:
source_name = stranger_info.get("nickname")
else:
logger.warning("无法获取戳一戳消息发送者的昵称,消息可能会无效")
source_name = "QQ用户"
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=user_id,
user_nickname=source_name,
user_cardname=source_cardname,
)
group_info: GroupInfo = None
if group_id:
fetched_group_info = await get_group_info(self.server_connection, group_id)
group_name: str = None
if fetched_group_info:
group_name = fetched_group_info.get("group_name")
group_info = GroupInfo(
platform=global_config.platform,
group_id=group_id,
group_name=group_name,
)
message_info: BaseMessageInfo = BaseMessageInfo(
platform=global_config.platform,
message_id="notice",
time=message_time,
user_info=user_info,
group_info=group_info,
template_info=None,
format_info=None,
)
message_base: MessageBase = MessageBase(
message_info=message_info,
message_segment=handled_message,
raw_message=json.dumps(raw_message),
)
logger.info("发送到Maibot处理通知信息")
await self.message_process(message_base)
async def handle_poke_notify(self, raw_message: dict) -> Seg | None:
self_info: dict = await get_self_info(self.server_connection)
if not self_info:
logger.error("自身信息获取失败")
return None
self_id = raw_message.get("self_id")
target_id = raw_message.get("target_id")
target_name: str = None
raw_info: list = raw_message.get("raw_info")
# 计算Seg
if self_id == target_id:
target_name = self_info.get("nickname")
else:
return None
try:
first_txt = raw_info[2].get("txt", "戳了戳")
second_txt = raw_info[4].get("txt", "")
except Exception as e:
logger.warning(f"解析戳一戳消息失败,使用默认文本:{str(e)}")
first_txt = "戳了戳"
second_txt = ""
"""
# 不启用戳其他人的处理
else:
# 由于Napcat不支持获取昵称所以需要单独获取
group_id = raw_message.get("group_id")
fetched_member_info: dict = await get_member_info(
self.server_connection, group_id, target_id
)
if fetched_member_info:
target_name = fetched_member_info.get("nickname")
"""
seg_data: Seg = Seg(
type="text",
data=f"{first_txt}{target_name}{second_txt}这是QQ的一个功能用于提及某人但没那么明显",
)
return seg_data
async def handle_forward_message(self, message_list: list) -> Seg | None:
"""
递归处理转发消息,并按照动态方式确定图片处理方式
Parameters:
message_list: list: 转发消息列表
"""
handled_message, image_count = await self._handle_forward_message(message_list, 0)
handled_message: Seg
image_count: int
if not handled_message:
return None
if image_count < 5 and image_count > 0:
# 处理图片数量小于5的情况此时解析图片为base64
return await self._recursive_parse_image_seg(handled_message, True)
elif image_count > 0:
# 处理图片数量大于等于5的情况此时解析图片为占位符
return await self._recursive_parse_image_seg(handled_message, False)
else:
# 处理没有图片的情况,此时直接返回
return handled_message
async def _recursive_parse_image_seg(self, seg_data: Seg, to_image: bool) -> Seg:
# sourcery skip: merge-else-if-into-elif
if to_image:
if seg_data.type == "seglist":
new_seg_list = []
for i_seg in seg_data.data:
parsed_seg = await self._recursive_parse_image_seg(i_seg, to_image)
new_seg_list.append(parsed_seg)
return Seg(type="seglist", data=new_seg_list)
elif seg_data.type == "image":
image_url = seg_data.data
try:
encoded_image = await get_image_base64(image_url)
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
return Seg(type="text", data="[图片]")
return Seg(type="image", data=encoded_image)
elif seg_data.type == "emoji":
image_url = seg_data.data
try:
encoded_image = await get_image_base64(image_url)
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
return Seg(type="text", data="[表情包]")
return Seg(type="emoji", data=encoded_image)
else:
return seg_data
else:
if seg_data.type == "seglist":
new_seg_list = []
for i_seg in seg_data.data:
parsed_seg = await self._recursive_parse_image_seg(i_seg, to_image)
new_seg_list.append(parsed_seg)
return Seg(type="seglist", data=new_seg_list)
elif seg_data.type == "image":
image_url = seg_data.data
return Seg(type="text", data="[图片]")
elif seg_data.type == "emoji":
image_url = seg_data.data
return Seg(type="text", data="[动画表情]")
else:
return seg_data
async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[Seg, int] | Tuple[None, int]:
# sourcery skip: low-code-quality
"""
递归处理实际转发消息
Parameters:
message_list: list: 转发消息列表首层对应messages字段后面对应content字段
layer: int: 当前层级
Returns:
seg_data: Seg: 处理后的消息段
image_count: int: 图片数量
"""
seg_list = []
image_count = 0
if message_list is None:
return None, 0
for sub_message in message_list:
sub_message: dict
sender_info: dict = sub_message.get("sender")
user_nickname: str = sender_info.get("nickname", "QQ用户")
user_nickname_str = f"{user_nickname}】:"
break_seg = Seg(type="text", data="\n")
message_of_sub_message_list: dict = sub_message.get("message")
if not message_of_sub_message_list:
logger.warning("转发消息内容为空")
continue
message_of_sub_message = message_of_sub_message_list[0]
if message_of_sub_message.get("type") == RealMessageType.forward:
if layer >= 3:
full_seg_data = (
Seg(
type="text",
data=("--" * layer) + f"{user_nickname}】:【转发消息】\n",
),
0,
)
else:
sub_message_data = message_of_sub_message.get("data")
if not sub_message_data:
continue
contents = sub_message_data.get("content")
seg_data, count = await self._handle_forward_message(contents, layer + 1)
image_count += count
head_tip = Seg(
type="text",
data=("--" * layer) + f"{user_nickname}】: 合并转发消息内容:\n",
)
full_seg_data = Seg(type="seglist", data=[head_tip, seg_data])
seg_list.append(full_seg_data)
elif message_of_sub_message.get("type") == RealMessageType.text:
sub_message_data = message_of_sub_message.get("data")
if not sub_message_data:
continue
text_message = sub_message_data.get("text")
seg_data = Seg(type="text", data=text_message)
if layer > 0:
seg_list.append(
Seg(
type="seglist",
data=[
Seg(type="text", data=("--" * layer) + user_nickname_str),
seg_data,
break_seg,
],
)
)
else:
seg_list.append(
Seg(
type="seglist",
data=[
Seg(type="text", data=user_nickname_str),
seg_data,
break_seg,
],
)
)
elif message_of_sub_message.get("type") == RealMessageType.image:
image_count += 1
image_data = message_of_sub_message.get("data")
sub_type = image_data.get("sub_type")
image_url = image_data.get("url")
if sub_type == 0:
seg_data = Seg(type="image", data=image_url)
else:
seg_data = Seg(type="emoji", data=image_url)
if layer > 0:
full_seg_data = Seg(
type="seglist",
data=[
Seg(type="text", data=("--" * layer) + user_nickname_str),
seg_data,
break_seg,
],
)
else:
full_seg_data = Seg(
type="seglist",
data=[
Seg(type="text", data=user_nickname_str),
seg_data,
break_seg,
],
)
seg_list.append(full_seg_data)
return Seg(type="seglist", data=seg_list), image_count
async def message_process(self, message_base: MessageBase) -> None:
try:
await self.maibot_router.send_message(message_base)
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
logger.error("请检查与MaiBot之间的连接")
return None
recv_handler = RecvHandler()