Merge pull request #34 from A0000Xz/dev

添加了戳一戳的命令
pull/36/head
UnCLAS-Prommer 2025-06-22 09:48:27 +08:00 committed by GitHub
commit 0753469746
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 420 additions and 390 deletions

View File

@ -1,77 +1,78 @@
from enum import Enum from enum import Enum
class MetaEventType: class MetaEventType:
lifecycle = "lifecycle" # 生命周期 lifecycle = "lifecycle" # 生命周期
class Lifecycle: class Lifecycle:
connect = "connect" # 生命周期 - WebSocket 连接成功 connect = "connect" # 生命周期 - WebSocket 连接成功
heartbeat = "heartbeat" # 心跳 heartbeat = "heartbeat" # 心跳
class MessageType: # 接受消息大类 class MessageType: # 接受消息大类
private = "private" # 私聊消息 private = "private" # 私聊消息
class Private: class Private:
friend = "friend" # 私聊消息 - 好友 friend = "friend" # 私聊消息 - 好友
group = "group" # 私聊消息 - 群临时 group = "group" # 私聊消息 - 群临时
group_self = "group_self" # 私聊消息 - 群中自身发送 group_self = "group_self" # 私聊消息 - 群中自身发送
other = "other" # 私聊消息 - 其他 other = "other" # 私聊消息 - 其他
group = "group" # 群聊消息 group = "group" # 群聊消息
class Group: class Group:
normal = "normal" # 群聊消息 - 普通 normal = "normal" # 群聊消息 - 普通
anonymous = "anonymous" # 群聊消息 - 匿名消息 anonymous = "anonymous" # 群聊消息 - 匿名消息
notice = "notice" # 群聊消息 - 系统提示 notice = "notice" # 群聊消息 - 系统提示
class NoticeType: # 通知事件 class NoticeType: # 通知事件
friend_recall = "friend_recall" # 私聊消息撤回 friend_recall = "friend_recall" # 私聊消息撤回
group_recall = "group_recall" # 群聊消息撤回 group_recall = "group_recall" # 群聊消息撤回
notify = "notify" notify = "notify"
class Notify: class Notify:
poke = "poke" # 戳一戳 poke = "poke" # 戳一戳
class RealMessageType: # 实际消息分类 class RealMessageType: # 实际消息分类
text = "text" # 纯文本 text = "text" # 纯文本
face = "face" # qq表情 face = "face" # qq表情
image = "image" # 图片 image = "image" # 图片
record = "record" # 语音 record = "record" # 语音
video = "video" # 视频 video = "video" # 视频
at = "at" # @某人 at = "at" # @某人
rps = "rps" # 猜拳魔法表情 rps = "rps" # 猜拳魔法表情
dice = "dice" # 骰子 dice = "dice" # 骰子
shake = "shake" # 私聊窗口抖动(只收) shake = "shake" # 私聊窗口抖动(只收)
poke = "poke" # 群聊戳一戳 poke = "poke" # 群聊戳一戳
share = "share" # 链接分享json形式 share = "share" # 链接分享json形式
reply = "reply" # 回复消息 reply = "reply" # 回复消息
forward = "forward" # 转发消息 forward = "forward" # 转发消息
node = "node" # 转发消息节点 node = "node" # 转发消息节点
class MessageSentType: class MessageSentType:
private = "private" private = "private"
class Private: class Private:
friend = "friend" friend = "friend"
group = "group" group = "group"
group = "group" group = "group"
class Group: class Group:
normal = "normal" normal = "normal"
class CommandType(Enum): class CommandType(Enum):
"""命令类型""" """命令类型"""
GROUP_BAN = "set_group_ban" # 禁言用户 GROUP_BAN = "set_group_ban" # 禁言用户
GROUP_WHOLE_BAN = "set_group_whole_ban" # 群全体禁言 GROUP_WHOLE_BAN = "set_group_whole_ban" # 群全体禁言
GROUP_KICK = "set_group_kick" # 踢出群聊 GROUP_KICK = "set_group_kick" # 踢出群聊
SEND_POKE = "send_poke" # 戳一戳
def __str__(self) -> str:
return self.value def __str__(self) -> str:
return self.value

View File

@ -1,313 +1,342 @@
import json import json
import websockets as Server import websockets as Server
import uuid import uuid
from maim_message import ( from maim_message import (
UserInfo, UserInfo,
GroupInfo, GroupInfo,
Seg, Seg,
BaseMessageInfo, BaseMessageInfo,
MessageBase, MessageBase,
) )
from typing import Dict, Any, Tuple from typing import Dict, Any, Tuple
from . import CommandType from . import CommandType
from .config import global_config from .config import global_config
from .response_pool import get_response from .response_pool import get_response
from .logger import logger from .logger import logger
from .utils import get_image_format, convert_image_to_gif from .utils import get_image_format, convert_image_to_gif
class SendHandler: class SendHandler:
def __init__(self): def __init__(self):
self.server_connection: Server.ServerConnection = None self.server_connection: Server.ServerConnection = None
async def handle_message(self, raw_message_base_dict: dict) -> None: async def handle_message(self, raw_message_base_dict: dict) -> None:
raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_dict) raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_dict)
message_segment: Seg = raw_message_base.message_segment message_segment: Seg = raw_message_base.message_segment
logger.info("接收到来自MaiBot的消息处理中") logger.info("接收到来自MaiBot的消息处理中")
if message_segment.type == "command": if message_segment.type == "command":
return await self.send_command(raw_message_base) return await self.send_command(raw_message_base)
else: else:
return await self.send_normal_message(raw_message_base) return await self.send_normal_message(raw_message_base)
async def send_normal_message(self, raw_message_base: MessageBase) -> None: async def send_normal_message(self, raw_message_base: MessageBase) -> None:
""" """
处理普通消息发送 处理普通消息发送
""" """
logger.info("处理普通信息中") logger.info("处理普通信息中")
message_info: BaseMessageInfo = raw_message_base.message_info message_info: BaseMessageInfo = raw_message_base.message_info
message_segment: Seg = raw_message_base.message_segment message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info group_info: GroupInfo = message_info.group_info
user_info: UserInfo = message_info.user_info user_info: UserInfo = message_info.user_info
target_id: int = None target_id: int = None
action: str = None action: str = None
id_name: str = None id_name: str = None
processed_message: list = [] processed_message: list = []
try: try:
processed_message = await self.handle_seg_recursive(message_segment) processed_message = await self.handle_seg_recursive(message_segment)
except Exception as e: except Exception as e:
logger.error(f"处理消息时发生错误: {e}") logger.error(f"处理消息时发生错误: {e}")
return return
if not processed_message: if not processed_message:
logger.critical("现在暂时不支持解析此回复!") logger.critical("现在暂时不支持解析此回复!")
return None return None
if group_info and user_info: if group_info and user_info:
logger.debug("发送群聊消息") logger.debug("发送群聊消息")
target_id = group_info.group_id target_id = group_info.group_id
action = "send_group_msg" action = "send_group_msg"
id_name = "group_id" id_name = "group_id"
elif user_info: elif user_info:
logger.debug("发送私聊消息") logger.debug("发送私聊消息")
target_id = user_info.user_id target_id = user_info.user_id
action = "send_private_msg" action = "send_private_msg"
id_name = "user_id" id_name = "user_id"
else: else:
logger.error("无法识别的消息类型") logger.error("无法识别的消息类型")
return return
logger.info("尝试发送到napcat") logger.info("尝试发送到napcat")
response = await self.send_message_to_napcat( response = await self.send_message_to_napcat(
action, action,
{ {
id_name: target_id, id_name: target_id,
"message": processed_message, "message": processed_message,
}, },
) )
if response.get("status") == "ok": if response.get("status") == "ok":
logger.info("消息发送成功") logger.info("消息发送成功")
else: else:
logger.warning(f"消息发送失败napcat返回{str(response)}") logger.warning(f"消息发送失败napcat返回{str(response)}")
async def send_command(self, raw_message_base: MessageBase) -> None: async def send_command(self, raw_message_base: MessageBase) -> None:
""" """
处理命令类 处理命令类
""" """
logger.info("处理命令中") logger.info("处理命令中")
message_info: BaseMessageInfo = raw_message_base.message_info message_info: BaseMessageInfo = raw_message_base.message_info
message_segment: Seg = raw_message_base.message_segment message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info group_info: GroupInfo = message_info.group_info
seg_data: Dict[str, Any] = message_segment.data seg_data: Dict[str, Any] = message_segment.data
command_name: str = seg_data.get("name") command_name: str = seg_data.get("name")
try: try:
match command_name: match command_name:
case CommandType.GROUP_BAN.name: case CommandType.GROUP_BAN.name:
command, args_dict = self.handle_ban_command(seg_data.get("args"), group_info) command, args_dict = self.handle_ban_command(seg_data.get("args"), group_info)
case CommandType.GROUP_WHOLE_BAN.name: case CommandType.GROUP_WHOLE_BAN.name:
command, args_dict = self.handle_whole_ban_command(seg_data.get("args"), group_info) command, args_dict = self.handle_whole_ban_command(seg_data.get("args"), group_info)
case CommandType.GROUP_KICK.name: case CommandType.GROUP_KICK.name:
command, args_dict = self.handle_kick_command(seg_data.get("args"), group_info) command, args_dict = self.handle_kick_command(seg_data.get("args"), group_info)
case _: case CommandType.SEND_POKE.name:
logger.error(f"未知命令: {command_name}") command, args_dict = self.handle_poke_command(seg_data.get("args"), group_info)
return case _:
except Exception as e: logger.error(f"未知命令: {command_name}")
logger.error(f"处理命令时发生错误: {e}") return
return None except Exception as e:
logger.error(f"处理命令时发生错误: {e}")
if not command or not args_dict: return None
logger.error("命令或参数缺失")
return None if not command or not args_dict:
logger.error("命令或参数缺失")
response = await self.send_message_to_napcat(command, args_dict) return None
if response.get("status") == "ok":
logger.info(f"命令 {command_name} 执行成功") response = await self.send_message_to_napcat(command, args_dict)
else: if response.get("status") == "ok":
logger.warning(f"命令 {command_name} 执行失败napcat返回{str(response)}") logger.info(f"命令 {command_name} 执行成功")
else:
def get_level(self, seg_data: Seg) -> int: logger.warning(f"命令 {command_name} 执行失败napcat返回{str(response)}")
if seg_data.type == "seglist":
return 1 + max(self.get_level(seg) for seg in seg_data.data) def get_level(self, seg_data: Seg) -> int:
else: if seg_data.type == "seglist":
return 1 return 1 + max(self.get_level(seg) for seg in seg_data.data)
else:
async def handle_seg_recursive(self, seg_data: Seg) -> list: return 1
payload: list = []
if seg_data.type == "seglist": async def handle_seg_recursive(self, seg_data: Seg) -> list:
# level = self.get_level(seg_data) # 给以后可能的多层嵌套做准备,此处不使用 payload: list = []
if not seg_data.data: if seg_data.type == "seglist":
return [] # level = self.get_level(seg_data) # 给以后可能的多层嵌套做准备,此处不使用
for seg in seg_data.data: if not seg_data.data:
payload = self.process_message_by_type(seg, payload) return []
else: for seg in seg_data.data:
payload = self.process_message_by_type(seg_data, payload) payload = self.process_message_by_type(seg, payload)
return payload else:
payload = self.process_message_by_type(seg_data, payload)
def process_message_by_type(self, seg: Seg, payload: list) -> list: return payload
# sourcery skip: reintroduce-else, swap-if-else-branches, use-named-expression
new_payload = payload def process_message_by_type(self, seg: Seg, payload: list) -> list:
if seg.type == "reply": # sourcery skip: reintroduce-else, swap-if-else-branches, use-named-expression
target_id = seg.data new_payload = payload
if target_id == "notice": if seg.type == "reply":
return payload target_id = seg.data
new_payload = self.build_payload(payload, self.handle_reply_message(target_id), True) if target_id == "notice":
elif seg.type == "text": return payload
text = seg.data new_payload = self.build_payload(payload, self.handle_reply_message(target_id), True)
if not text: elif seg.type == "text":
return payload text = seg.data
new_payload = self.build_payload(payload, self.handle_text_message(text), False) if not text:
elif seg.type == "face": return payload
logger.warning("MaiBot 发送了qq原生表情暂时不支持") new_payload = self.build_payload(payload, self.handle_text_message(text), False)
elif seg.type == "image": elif seg.type == "face":
image = seg.data logger.warning("MaiBot 发送了qq原生表情暂时不支持")
new_payload = self.build_payload(payload, self.handle_image_message(image), False) elif seg.type == "image":
elif seg.type == "emoji": image = seg.data
emoji = seg.data new_payload = self.build_payload(payload, self.handle_image_message(image), False)
new_payload = self.build_payload(payload, self.handle_emoji_message(emoji), False) elif seg.type == "emoji":
elif seg.type == "voice": emoji = seg.data
voice = seg.data new_payload = self.build_payload(payload, self.handle_emoji_message(emoji), False)
new_payload = self.build_payload(payload, self.handle_voice_message(voice), False) elif seg.type == "voice":
return new_payload voice = seg.data
new_payload = self.build_payload(payload, self.handle_voice_message(voice), False)
def build_payload(self, payload: list, addon: dict, is_reply: bool = False) -> list: return new_payload
# sourcery skip: for-append-to-extend, merge-list-append, simplify-generator
"""构建发送的消息体""" def build_payload(self, payload: list, addon: dict, is_reply: bool = False) -> list:
if is_reply: # sourcery skip: for-append-to-extend, merge-list-append, simplify-generator
temp_list = [] """构建发送的消息体"""
temp_list.append(addon) if is_reply:
for i in payload: temp_list = []
if i.get("type") == "reply": temp_list.append(addon)
logger.debug("检测到多个回复,使用最新的回复") for i in payload:
continue if i.get("type") == "reply":
temp_list.append(i) logger.debug("检测到多个回复,使用最新的回复")
return temp_list continue
else: temp_list.append(i)
payload.append(addon) return temp_list
return payload else:
payload.append(addon)
def handle_reply_message(self, id: str) -> dict: return payload
"""处理回复消息"""
return {"type": "reply", "data": {"id": id}} def handle_reply_message(self, id: str) -> dict:
"""处理回复消息"""
def handle_text_message(self, message: str) -> dict: return {"type": "reply", "data": {"id": id}}
"""处理文本消息"""
return {"type": "text", "data": {"text": message}} def handle_text_message(self, message: str) -> dict:
"""处理文本消息"""
def handle_image_message(self, encoded_image: str) -> dict: return {"type": "text", "data": {"text": message}}
"""处理图片消息"""
return { def handle_image_message(self, encoded_image: str) -> dict:
"type": "image", """处理图片消息"""
"data": { return {
"file": f"base64://{encoded_image}", "type": "image",
"subtype": 0, "data": {
}, "file": f"base64://{encoded_image}",
} # base64 编码的图片 "subtype": 0,
},
def handle_emoji_message(self, encoded_emoji: str) -> dict: } # base64 编码的图片
"""处理表情消息"""
encoded_image = encoded_emoji def handle_emoji_message(self, encoded_emoji: str) -> dict:
image_format = get_image_format(encoded_emoji) """处理表情消息"""
if image_format != "gif": encoded_image = encoded_emoji
encoded_image = convert_image_to_gif(encoded_emoji) image_format = get_image_format(encoded_emoji)
return { if image_format != "gif":
"type": "image", encoded_image = convert_image_to_gif(encoded_emoji)
"data": { return {
"file": f"base64://{encoded_image}", "type": "image",
"subtype": 1, "data": {
"summary": "[动画表情]", "file": f"base64://{encoded_image}",
}, "subtype": 1,
} "summary": "[动画表情]",
},
def handle_voice_message(self, encoded_voice: str) -> dict: }
"""处理语音消息"""
if not global_config.voice.use_tts: def handle_voice_message(self, encoded_voice: str) -> dict:
logger.warning("未启用语音消息处理") """处理语音消息"""
return {} if not global_config.voice.use_tts:
if not encoded_voice: logger.warning("未启用语音消息处理")
return {} return {}
return { if not encoded_voice:
"type": "record", return {}
"data": {"file": f"base64://{encoded_voice}"}, return {
} "type": "record",
"data": {"file": f"base64://{encoded_voice}"},
def handle_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: }
"""处理封禁命令
def handle_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
Args: """处理封禁命令
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊 Args:
args (Dict[str, Any]): 参数字典
Returns: group_info (GroupInfo): 群聊信息对应目标群聊
Tuple[CommandType, Dict[str, Any]]
""" Returns:
duration: int = int(args["duration"]) Tuple[CommandType, Dict[str, Any]]
user_id: int = int(args["qq_id"]) """
group_id: int = int(group_info.group_id) duration: int = int(args["duration"])
if duration <= 0: user_id: int = int(args["qq_id"])
raise ValueError("封禁时间必须大于0") group_id: int = int(group_info.group_id)
if not user_id or not group_id: if duration <= 0:
raise ValueError("封禁命令缺少必要参数") raise ValueError("封禁时间必须大于0")
if duration > 2592000: if not user_id or not group_id:
raise ValueError("封禁时间不能超过30天") raise ValueError("封禁命令缺少必要参数")
return ( if duration > 2592000:
CommandType.GROUP_BAN.value, raise ValueError("封禁时间不能超过30天")
{ return (
"group_id": group_id, CommandType.GROUP_BAN.value,
"user_id": user_id, {
"duration": duration, "group_id": group_id,
}, "user_id": user_id,
) "duration": duration,
},
def handle_whole_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: )
"""处理全体禁言命令
def handle_whole_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
Args: """处理全体禁言命令
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊 Args:
args (Dict[str, Any]): 参数字典
Returns: group_info (GroupInfo): 群聊信息对应目标群聊
Tuple[CommandType, Dict[str, Any]]
""" Returns:
enable = args["enable"] Tuple[CommandType, Dict[str, Any]]
assert isinstance(enable, bool), "enable参数必须是布尔值" """
group_id: int = int(group_info.group_id) enable = args["enable"]
if group_id <= 0: assert isinstance(enable, bool), "enable参数必须是布尔值"
raise ValueError("群组ID无效") group_id: int = int(group_info.group_id)
return ( if group_id <= 0:
CommandType.GROUP_WHOLE_BAN.value, raise ValueError("群组ID无效")
{ return (
"group_id": group_id, CommandType.GROUP_WHOLE_BAN.value,
"enable": enable, {
}, "group_id": group_id,
) "enable": enable,
},
def handle_kick_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: )
"""处理群成员踢出命令
def handle_kick_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
Args: """处理群成员踢出命令
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊 Args:
args (Dict[str, Any]): 参数字典
Returns: group_info (GroupInfo): 群聊信息对应目标群聊
Tuple[CommandType, Dict[str, Any]]
""" Returns:
user_id: int = int(args["qq_id"]) Tuple[CommandType, Dict[str, Any]]
group_id: int = int(group_info.group_id) """
if group_id <= 0: user_id: int = int(args["qq_id"])
raise ValueError("群组ID无效") group_id: int = int(group_info.group_id)
if user_id <= 0: if group_id <= 0:
raise ValueError("用户ID无效") raise ValueError("群组ID无效")
return ( if user_id <= 0:
CommandType.GROUP_KICK.value, raise ValueError("用户ID无效")
{ return (
"group_id": group_id, CommandType.GROUP_KICK.value,
"user_id": user_id, {
"reject_add_request": False, # 不拒绝加群请求 "group_id": group_id,
}, "user_id": user_id,
) "reject_add_request": False, # 不拒绝加群请求
},
async def send_message_to_napcat(self, action: str, params: dict) -> dict: )
request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": action, "params": params, "echo": request_uuid}) def handle_poke_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
await self.server_connection.send(payload) """处理戳一戳命令
try:
response = await get_response(request_uuid) Args:
except TimeoutError: args (Dict[str, Any]): 参数字典
logger.error("发送消息超时,未收到响应") group_info (GroupInfo): 群聊信息对应目标群聊
return {"status": "error", "message": "timeout"}
except Exception as e: Returns:
logger.error(f"发送消息失败: {e}") Tuple[CommandType, Dict[str, Any]]
return {"status": "error", "message": str(e)} """
return response user_id: int = int(args["qq_id"])
if group_info == None:
group_id = None
send_handler = SendHandler() else:
group_id: int = int(group_info.group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
if user_id <= 0:
raise ValueError("用户ID无效")
return (
CommandType.SEND_POKE.value,
{
"group_id": group_id,
"user_id": user_id,
},
)
async def send_message_to_napcat(self, action: str, params: dict) -> dict:
request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": action, "params": params, "echo": request_uuid})
await self.server_connection.send(payload)
try:
response = await get_response(request_uuid)
except TimeoutError:
logger.error("发送消息超时,未收到响应")
return {"status": "error", "message": "timeout"}
except Exception as e:
logger.error(f"发送消息失败: {e}")
return {"status": "error", "message": str(e)}
return response
send_handler = SendHandler()