From 8bf1bd15178f9b64a20720ab3da18323d1fba4f1 Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Wed, 10 Sep 2025 22:44:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=9B=B4=E5=A4=9A=E7=A7=8D?= =?UTF-8?q?=E7=B1=BB=E7=9A=84=E6=B6=88=E6=81=AF=E8=A7=A3=E6=9E=90=EF=BC=8C?= =?UTF-8?q?=E9=87=8D=E6=9E=84send=5Fhandler=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 4 +- pyproject.toml | 2 +- src/__init__.py | 1 + src/mmc_com_layer.py | 2 +- src/recv_handler/__init__.py | 2 +- src/send_handler.py | 461 ----------------------- src/send_handler/__init__.py | 0 src/send_handler/main_send_handler.py | 104 +++++ src/send_handler/nc_sending.py | 49 +++ src/send_handler/send_command_handler.py | 221 +++++++++++ src/send_handler/send_message_handler.py | 188 +++++++++ 11 files changed, 568 insertions(+), 466 deletions(-) delete mode 100644 src/send_handler.py create mode 100644 src/send_handler/__init__.py create mode 100644 src/send_handler/main_send_handler.py create mode 100644 src/send_handler/nc_sending.py create mode 100644 src/send_handler/send_command_handler.py create mode 100644 src/send_handler/send_message_handler.py diff --git a/main.py b/main.py index 64d8c32..424860d 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from src.recv_handler.message_handler import message_handler from src.recv_handler.meta_event_handler import meta_event_handler from src.recv_handler.notice_handler import notice_handler from src.recv_handler.message_sending import message_send_instance -from src.send_handler import send_handler +from src.send_handler.nc_sending import nc_message_sender from src.config import global_config from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router from src.response_pool import put_response, check_timeout_response @@ -18,7 +18,7 @@ message_queue = asyncio.Queue() async def message_recv(server_connection: Server.ServerConnection): await message_handler.set_server_connection(server_connection) asyncio.create_task(notice_handler.set_server_connection(server_connection)) - await send_handler.set_server_connection(server_connection) + await nc_message_sender.set_server_connection(server_connection) async for raw_message in server_connection: logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message) decoded_raw_message: dict = json.loads(raw_message) diff --git a/pyproject.toml b/pyproject.toml index 42e56eb..50161b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "MaiBotNapcatAdapter" -version = "0.4.7" +version = "0.5.0" description = "A MaiBot adapter for Napcat" [tool.ruff] diff --git a/src/__init__.py b/src/__init__.py index 0159c09..646d0a9 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -13,6 +13,7 @@ class CommandType(Enum): SEND_POKE = "send_poke" # 戳一戳 DELETE_MSG = "delete_msg" # 撤回消息 AI_VOICE_SEND = "send_group_ai_record" # 发送群AI语音 + MESSAGE_LIKE = "message_like" # 给消息贴表情 def __str__(self) -> str: return self.value diff --git a/src/mmc_com_layer.py b/src/mmc_com_layer.py index 0c5a525..012d153 100644 --- a/src/mmc_com_layer.py +++ b/src/mmc_com_layer.py @@ -1,7 +1,7 @@ from maim_message import Router, RouteConfig, TargetConfig from .config import global_config from .logger import logger, custom_logger -from .send_handler import send_handler +from .send_handler.main_send_handler import send_handler route_config = RouteConfig( route_config={ diff --git a/src/recv_handler/__init__.py b/src/recv_handler/__init__.py index 422041b..3f342fd 100644 --- a/src/recv_handler/__init__.py +++ b/src/recv_handler/__init__.py @@ -84,4 +84,4 @@ class CommandType(Enum): return self.value -ACCEPT_FORMAT = ["text", "image", "emoji", "reply", "voice", "command", "voiceurl", "music", "videourl", "file"] +ACCEPT_FORMAT = ["text", "image", "emoji", "reply", "voice", "command", "voiceurl", "music", "videourl", "file", "forward"] diff --git a/src/send_handler.py b/src/send_handler.py deleted file mode 100644 index cf64a44..0000000 --- a/src/send_handler.py +++ /dev/null @@ -1,461 +0,0 @@ -import json -import websockets as Server -import uuid -from maim_message import ( - UserInfo, - GroupInfo, - Seg, - BaseMessageInfo, - MessageBase, -) -from typing import Dict, Any, Tuple - -from . import CommandType -from .config import global_config -from .response_pool import get_response -from .logger import logger -from .utils import get_image_format, convert_image_to_gif -from .recv_handler.message_sending import message_send_instance - - -class SendHandler: - def __init__(self): - self.server_connection: Server.ServerConnection = None - - async def set_server_connection(self, server_connection: Server.ServerConnection) -> None: - """设置Napcat连接""" - self.server_connection = server_connection - - async def handle_message(self, raw_message_base_dict: dict) -> None: - raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_dict) - message_segment: Seg = raw_message_base.message_segment - logger.info("接收到来自MaiBot的消息,处理中") - if message_segment.type == "command": - return await self.send_command(raw_message_base) - else: - return await self.send_normal_message(raw_message_base) - - async def send_normal_message(self, raw_message_base: MessageBase) -> None: - """ - 处理普通消息发送 - """ - logger.info("处理普通信息中") - message_info: BaseMessageInfo = raw_message_base.message_info - message_segment: Seg = raw_message_base.message_segment - group_info: GroupInfo = message_info.group_info - user_info: UserInfo = message_info.user_info - target_id: int = None - action: str = None - id_name: str = None - processed_message: list = [] - try: - processed_message = await self.handle_seg_recursive(message_segment) - except Exception as e: - logger.error(f"处理消息时发生错误: {e}") - return - - if not processed_message: - logger.critical("现在暂时不支持解析此回复!") - return None - - if group_info and user_info: - logger.debug("发送群聊消息") - target_id = group_info.group_id - action = "send_group_msg" - id_name = "group_id" - elif user_info: - logger.debug("发送私聊消息") - target_id = user_info.user_id - action = "send_private_msg" - id_name = "user_id" - else: - logger.error("无法识别的消息类型") - return - logger.info("尝试发送到napcat") - response = await self.send_message_to_napcat( - action, - { - id_name: target_id, - "message": processed_message, - }, - ) - if response.get("status") == "ok": - logger.info("消息发送成功") - qq_message_id = response.get("data", {}).get("message_id") - await self.message_sent_back(raw_message_base, qq_message_id) - else: - logger.warning(f"消息发送失败,napcat返回:{str(response)}") - - async def send_command(self, raw_message_base: MessageBase) -> None: - """ - 处理命令类 - """ - logger.info("处理命令中") - message_info: BaseMessageInfo = raw_message_base.message_info - message_segment: Seg = raw_message_base.message_segment - group_info: GroupInfo = message_info.group_info - seg_data: Dict[str, Any] = message_segment.data - command_name: str = seg_data.get("name") - try: - match command_name: - case CommandType.GROUP_BAN.name: - command, args_dict = self.handle_ban_command(seg_data.get("args"), group_info) - case CommandType.GROUP_WHOLE_BAN.name: - command, args_dict = self.handle_whole_ban_command(seg_data.get("args"), group_info) - case CommandType.GROUP_KICK.name: - command, args_dict = self.handle_kick_command(seg_data.get("args"), group_info) - case CommandType.SEND_POKE.name: - command, args_dict = self.handle_poke_command(seg_data.get("args"), group_info) - case CommandType.DELETE_MSG.name: - command, args_dict = self.delete_msg_command(seg_data.get("args")) - case CommandType.AI_VOICE_SEND.name: - command, args_dict = self.handle_ai_voice_send_command(seg_data.get("args"), group_info) - case _: - logger.error(f"未知命令: {command_name}") - return - except Exception as e: - logger.error(f"处理命令时发生错误: {e}") - return None - - if not command or not args_dict: - logger.error("命令或参数缺失") - return None - - response = await self.send_message_to_napcat(command, args_dict) - if response.get("status") == "ok": - logger.info(f"命令 {command_name} 执行成功") - else: - logger.warning(f"命令 {command_name} 执行失败,napcat返回:{str(response)}") - - def get_level(self, seg_data: Seg) -> int: - if seg_data.type == "seglist": - return 1 + max(self.get_level(seg) for seg in seg_data.data) - else: - return 1 - - async def handle_seg_recursive(self, seg_data: Seg) -> list: - payload: list = [] - if seg_data.type == "seglist": - # level = self.get_level(seg_data) # 给以后可能的多层嵌套做准备,此处不使用 - if not seg_data.data: - return [] - for seg in seg_data.data: - payload = self.process_message_by_type(seg, payload) - else: - payload = self.process_message_by_type(seg_data, payload) - return payload - - def process_message_by_type(self, seg: Seg, payload: list) -> list: - # sourcery skip: reintroduce-else, swap-if-else-branches, use-named-expression - new_payload = payload - if seg.type == "reply": - target_id = seg.data - if target_id == "notice": - return payload - new_payload = self.build_payload(payload, self.handle_reply_message(target_id), True) - elif seg.type == "text": - text = seg.data - if not text: - return payload - new_payload = self.build_payload(payload, self.handle_text_message(text), False) - elif seg.type == "face": - logger.warning("MaiBot 发送了qq原生表情,暂时不支持") - elif seg.type == "image": - image = seg.data - new_payload = self.build_payload(payload, self.handle_image_message(image), False) - elif seg.type == "emoji": - emoji = seg.data - new_payload = self.build_payload(payload, self.handle_emoji_message(emoji), False) - elif seg.type == "voice": - voice = seg.data - new_payload = self.build_payload(payload, self.handle_voice_message(voice), False) - elif seg.type == "voiceurl": - voice_url = seg.data - new_payload = self.build_payload(payload, self.handle_voiceurl_message(voice_url), False) - elif seg.type == "music": - song_id = seg.data - new_payload = self.build_payload(payload, self.handle_music_message(song_id), False) - elif seg.type == "videourl": - video_url = seg.data - new_payload = self.build_payload(payload, self.handle_videourl_message(video_url), False) - elif seg.type == "file": - file_path = seg.data - new_payload = self.build_payload(payload, self.handle_file_message(file_path), False) - return new_payload - - def build_payload(self, payload: list, addon: dict, is_reply: bool = False) -> list: - # sourcery skip: for-append-to-extend, merge-list-append, simplify-generator - """构建发送的消息体""" - if is_reply: - temp_list = [] - temp_list.append(addon) - for i in payload: - if i.get("type") == "reply": - logger.debug("检测到多个回复,使用最新的回复") - continue - temp_list.append(i) - return temp_list - else: - payload.append(addon) - return payload - - def handle_reply_message(self, id: str) -> dict: - """处理回复消息""" - return {"type": "reply", "data": {"id": id}} - - def handle_text_message(self, message: str) -> dict: - """处理文本消息""" - return {"type": "text", "data": {"text": message}} - - def handle_image_message(self, encoded_image: str) -> dict: - """处理图片消息""" - return { - "type": "image", - "data": { - "file": f"base64://{encoded_image}", - "subtype": 0, - }, - } # base64 编码的图片 - - def handle_emoji_message(self, encoded_emoji: str) -> dict: - """处理表情消息""" - encoded_image = encoded_emoji - image_format = get_image_format(encoded_emoji) - if image_format != "gif": - encoded_image = convert_image_to_gif(encoded_emoji) - return { - "type": "image", - "data": { - "file": f"base64://{encoded_image}", - "subtype": 1, - "summary": "[动画表情]", - }, - } - - def handle_voice_message(self, encoded_voice: str) -> dict: - """处理语音消息""" - if not global_config.voice.use_tts: - logger.warning("未启用语音消息处理") - return {} - if not encoded_voice: - return {} - return { - "type": "record", - "data": {"file": f"base64://{encoded_voice}"}, - } - - def handle_voiceurl_message(self, voice_url: str) -> dict: - """处理语音链接消息""" - return { - "type": "record", - "data": {"file": voice_url}, - } - - def handle_music_message(self, song_id: str) -> dict: - """处理音乐消息""" - return { - "type": "music", - "data": {"type": "163", "id": song_id}, - } - def handle_videourl_message(self, video_url: str) -> dict: - """处理视频链接消息""" - return { - "type": "video", - "data": {"file": video_url}, - } - - def handle_file_message(self, file_path: str) -> dict: - """处理文件消息""" - return { - "type": "file", - "data": {"file": f"file://{file_path}"}, - } - - def handle_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: - """处理封禁命令 - - Args: - args (Dict[str, Any]): 参数字典 - group_info (GroupInfo): 群聊信息(对应目标群聊) - - Returns: - Tuple[CommandType, Dict[str, Any]] - """ - duration: int = int(args["duration"]) - user_id: int = int(args["qq_id"]) - group_id: int = int(group_info.group_id) - if duration < 0: - raise ValueError("封禁时间必须大于等于0") - if not user_id or not group_id: - raise ValueError("封禁命令缺少必要参数") - if duration > 2592000: - raise ValueError("封禁时间不能超过30天") - return ( - CommandType.GROUP_BAN.value, - { - "group_id": group_id, - "user_id": user_id, - "duration": duration, - }, - ) - - def handle_whole_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: - """处理全体禁言命令 - - Args: - args (Dict[str, Any]): 参数字典 - group_info (GroupInfo): 群聊信息(对应目标群聊) - - Returns: - Tuple[CommandType, Dict[str, Any]] - """ - enable = args["enable"] - assert isinstance(enable, bool), "enable参数必须是布尔值" - group_id: int = int(group_info.group_id) - if group_id <= 0: - raise ValueError("群组ID无效") - return ( - CommandType.GROUP_WHOLE_BAN.value, - { - "group_id": group_id, - "enable": enable, - }, - ) - - def handle_kick_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: - """处理群成员踢出命令 - - Args: - args (Dict[str, Any]): 参数字典 - group_info (GroupInfo): 群聊信息(对应目标群聊) - - Returns: - Tuple[CommandType, Dict[str, Any]] - """ - user_id: int = int(args["qq_id"]) - group_id: int = int(group_info.group_id) - if group_id <= 0: - raise ValueError("群组ID无效") - if user_id <= 0: - raise ValueError("用户ID无效") - return ( - CommandType.GROUP_KICK.value, - { - "group_id": group_id, - "user_id": user_id, - "reject_add_request": False, # 不拒绝加群请求 - }, - ) - - def handle_poke_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: - """处理戳一戳命令 - - Args: - args (Dict[str, Any]): 参数字典 - group_info (GroupInfo): 群聊信息(对应目标群聊) - - Returns: - Tuple[CommandType, Dict[str, Any]] - """ - user_id: int = int(args["qq_id"]) - if group_info is None: - group_id = None - else: - group_id: int = int(group_info.group_id) - if group_id <= 0: - raise ValueError("群组ID无效") - if user_id <= 0: - raise ValueError("用户ID无效") - return ( - CommandType.SEND_POKE.value, - { - "group_id": group_id, - "user_id": user_id, - }, - ) - - def delete_msg_command(self, args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: - """处理撤回消息命令 - - Args: - args (Dict[str, Any]): 参数字典 - - Returns: - Tuple[CommandType, Dict[str, Any]] - """ - try: - message_id = int(args["message_id"]) - if message_id <= 0: - raise ValueError("消息ID无效") - except KeyError: - raise ValueError("缺少必需参数: message_id") from None - except (ValueError, TypeError) as e: - raise ValueError(f"消息ID无效: {args['message_id']} - {str(e)}") from None - - return ( - CommandType.DELETE_MSG.value, - { - "message_id": message_id, - }, - ) - - def handle_ai_voice_send_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: - """ - 处理AI语音发送命令的逻辑。 - 并返回 NapCat 兼容的 (action, params) 元组。 - """ - if not group_info or not group_info.group_id: - raise ValueError("AI语音发送命令必须在群聊上下文中使用") - if not args: - raise ValueError("AI语音发送命令缺少参数") - - group_id: int = int(group_info.group_id) - character_id = args.get("character") - text_content = args.get("text") - - if not character_id or not text_content: - raise ValueError(f"AI语音发送命令参数不完整: character='{character_id}', text='{text_content}'") - - return ( - CommandType.AI_VOICE_SEND.value, - { - "group_id": group_id, - "text": text_content, - "character": character_id, - }, - ) - - async def send_message_to_napcat(self, action: str, params: dict) -> dict: - request_uuid = str(uuid.uuid4()) - payload = json.dumps({"action": action, "params": params, "echo": request_uuid}) - await self.server_connection.send(payload) - try: - response = await get_response(request_uuid) - except TimeoutError: - logger.error("发送消息超时,未收到响应") - return {"status": "error", "message": "timeout"} - except Exception as e: - logger.error(f"发送消息失败: {e}") - return {"status": "error", "message": str(e)} - return response - - async def message_sent_back(self, message_base: MessageBase, qq_message_id: str) -> None: - # 修改 additional_config,添加 echo 字段 - if message_base.message_info.additional_config is None: - message_base.message_info.additional_config = {} - - message_base.message_info.additional_config["echo"] = True - - # 获取原始的 mmc_message_id - mmc_message_id = message_base.message_info.message_id - - # 修改 message_segment 为 notify 类型 - message_base.message_segment = Seg( - type="notify", data={"sub_type": "echo", "echo": mmc_message_id, "actual_id": qq_message_id} - ) - await message_send_instance.message_send(message_base) - logger.debug("已回送消息ID") - return - - -send_handler = SendHandler() diff --git a/src/send_handler/__init__.py b/src/send_handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/send_handler/main_send_handler.py b/src/send_handler/main_send_handler.py new file mode 100644 index 0000000..8cce8a9 --- /dev/null +++ b/src/send_handler/main_send_handler.py @@ -0,0 +1,104 @@ +from typing import Any, Dict +from maim_message import ( + UserInfo, + GroupInfo, + Seg, + BaseMessageInfo, + MessageBase, +) +from src.logger import logger +from .send_command_handler import SendCommandHandleClass +from .send_message_handler import SendMessageHandleClass +from .nc_sending import nc_message_sender + + +class SendHandler: + def __init__(self): + pass + + async def handle_message(self, raw_message_base_dict: dict) -> None: + raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_dict) + message_segment: Seg = raw_message_base.message_segment + logger.info("接收到来自MaiBot的消息,处理中") + if message_segment.type == "command": + return await self.send_command(raw_message_base) + else: + return await self.send_normal_message(raw_message_base) + + async def send_command(self, raw_message_base: MessageBase) -> None: + """ + 处理命令类 + """ + logger.info("处理命令中") + message_info: BaseMessageInfo = raw_message_base.message_info + message_segment: Seg = raw_message_base.message_segment + group_info: GroupInfo = message_info.group_info + seg_data: Dict[str, Any] = message_segment.data + try: + command, args_dict = SendCommandHandleClass.handle_command(seg_data, group_info) + except Exception as e: + logger.error(f"处理命令时出错: {str(e)}") + return + + if not command or not args_dict: + logger.error("命令或参数缺失") + return None + + response = await nc_message_sender.send_message_to_napcat(command, args_dict) + if response.get("status") == "ok": + logger.info(f"命令 {seg_data.get('name')} 执行成功") + else: + logger.warning(f"命令 {seg_data.get('name')} 执行失败,napcat返回:{str(response)}") + + async def send_normal_message(self, raw_message_base: MessageBase) -> None: + """ + 处理普通消息发送 + """ + logger.info("处理普通信息中") + message_info: BaseMessageInfo = raw_message_base.message_info + message_segment: Seg = raw_message_base.message_segment + group_info: GroupInfo = message_info.group_info + user_info: UserInfo = message_info.user_info + target_id: int = None + action: str = None + id_name: str = None + processed_message: list = [] + try: + processed_message = SendMessageHandleClass.process_seg_recursive(message_segment) + except Exception as e: + logger.error(f"处理消息时发生错误: {e}") + return + + if not processed_message: + logger.critical("现在暂时不支持解析此回复!") + return None + + if group_info and user_info: + logger.debug("发送群聊消息") + target_id = group_info.group_id + action = "send_group_msg" + id_name = "group_id" + elif user_info: + logger.debug("发送私聊消息") + target_id = user_info.user_id + action = "send_private_msg" + id_name = "user_id" + else: + logger.error("无法识别的消息类型") + return + logger.info("尝试发送到napcat") + response = await nc_message_sender.send_message_to_napcat( + action, + { + id_name: target_id, + "message": processed_message, + }, + ) + if response.get("status") == "ok": + logger.info("消息发送成功") + qq_message_id = response.get("data", {}).get("message_id") + await nc_message_sender.message_sent_back(raw_message_base, qq_message_id) + else: + logger.warning(f"消息发送失败,napcat返回:{str(response)}") + +send_handler = SendHandler() \ No newline at end of file diff --git a/src/send_handler/nc_sending.py b/src/send_handler/nc_sending.py new file mode 100644 index 0000000..e9f45ca --- /dev/null +++ b/src/send_handler/nc_sending.py @@ -0,0 +1,49 @@ +import json +import uuid +import websockets as Server +from maim_message import MessageBase, Seg + +from src.response_pool import get_response +from src.logger import logger +from src.recv_handler.message_sending import message_send_instance + +class NCMessageSender: + def __init__(self): + self.server_connection: Server.ServerConnection = None + + async def set_server_connection(self, connection: Server.ServerConnection): + self.server_connection = connection + + async def send_message_to_napcat(self, action: str, params: dict) -> dict: + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": action, "params": params, "echo": request_uuid}) + await self.server_connection.send(payload) + try: + response = await get_response(request_uuid) + except TimeoutError: + logger.error("发送消息超时,未收到响应") + return {"status": "error", "message": "timeout"} + except Exception as e: + logger.error(f"发送消息失败: {e}") + return {"status": "error", "message": str(e)} + return response + + async def message_sent_back(self, message_base: MessageBase, qq_message_id: str) -> None: + # 修改 additional_config,添加 echo 字段 + if message_base.message_info.additional_config is None: + message_base.message_info.additional_config = {} + + message_base.message_info.additional_config["echo"] = True + + # 获取原始的 mmc_message_id + mmc_message_id = message_base.message_info.message_id + + # 修改 message_segment 为 notify 类型 + message_base.message_segment = Seg( + type="notify", data={"sub_type": "echo", "echo": mmc_message_id, "actual_id": qq_message_id} + ) + await message_send_instance.message_send(message_base) + logger.debug("已回送消息ID") + return + +nc_message_sender = NCMessageSender() \ No newline at end of file diff --git a/src/send_handler/send_command_handler.py b/src/send_handler/send_command_handler.py new file mode 100644 index 0000000..dff9505 --- /dev/null +++ b/src/send_handler/send_command_handler.py @@ -0,0 +1,221 @@ +from maim_message import GroupInfo +from typing import Any, Dict, Tuple + +from src import CommandType + + +class SendCommandHandleClass: + @classmethod + def handle_command(cls, raw_command_data: Dict[str, Any], group_info: GroupInfo): + command_name: str = raw_command_data.get("name") + try: + match command_name: + case CommandType.GROUP_BAN.name: + return cls.handle_ban_command(raw_command_data.get("args", {}), group_info) + case CommandType.GROUP_WHOLE_BAN.name: + return cls.handle_whole_ban_command(raw_command_data.get("args", {}), group_info) + case CommandType.GROUP_KICK.name: + return cls.handle_kick_command(raw_command_data.get("args", {}), group_info) + case CommandType.SEND_POKE.name: + return cls.handle_poke_command(raw_command_data.get("args", {}), group_info) + case CommandType.DELETE_MSG.name: + return cls.delete_msg_command(raw_command_data.get("args", {})) + case CommandType.AI_VOICE_SEND.name: + return cls.handle_ai_voice_send_command(raw_command_data.get("args", {}), group_info) + case CommandType.MESSAGE_LIKE.name: + return cls.handle_message_like_command(raw_command_data.get("args", {})) + case _: + raise RuntimeError(f"未知的命令类型: {command_name}") + except Exception as e: + raise RuntimeError(f"处理命令时出错: {str(e)}") from e + + @staticmethod + def handle_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: + """处理封禁命令 + + Args: + args (Dict[str, Any]): 参数字典 + group_info (GroupInfo): 群聊信息(对应目标群聊) + + Returns: + Tuple[CommandType, Dict[str, Any]] + """ + duration: int = int(args["duration"]) + user_id: int = int(args["qq_id"]) + group_id: int = int(group_info.group_id) + if duration < 0: + raise ValueError("封禁时间必须大于等于0") + if not user_id or not group_id: + raise ValueError("封禁命令缺少必要参数") + if duration > 2592000: + raise ValueError("封禁时间不能超过30天") + return ( + CommandType.GROUP_BAN.value, + { + "group_id": group_id, + "user_id": user_id, + "duration": duration, + }, + ) + + @staticmethod + def handle_whole_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: + """处理全体禁言命令 + + Args: + args (Dict[str, Any]): 参数字典 + group_info (GroupInfo): 群聊信息(对应目标群聊) + + Returns: + Tuple[CommandType, Dict[str, Any]] + """ + enable = args["enable"] + assert isinstance(enable, bool), "enable参数必须是布尔值" + group_id: int = int(group_info.group_id) + if group_id <= 0: + raise ValueError("群组ID无效") + return ( + CommandType.GROUP_WHOLE_BAN.value, + { + "group_id": group_id, + "enable": enable, + }, + ) + + @staticmethod + def handle_kick_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: + """处理群成员踢出命令 + + Args: + args (Dict[str, Any]): 参数字典 + group_info (GroupInfo): 群聊信息(对应目标群聊) + + Returns: + Tuple[CommandType, Dict[str, Any]] + """ + user_id: int = int(args["qq_id"]) + group_id: int = int(group_info.group_id) + if group_id <= 0: + raise ValueError("群组ID无效") + if user_id <= 0: + raise ValueError("用户ID无效") + return ( + CommandType.GROUP_KICK.value, + { + "group_id": group_id, + "user_id": user_id, + "reject_add_request": False, # 不拒绝加群请求 + }, + ) + + @staticmethod + def handle_poke_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: + """处理戳一戳命令 + + Args: + args (Dict[str, Any]): 参数字典 + group_info (GroupInfo): 群聊信息(对应目标群聊) + + Returns: + Tuple[CommandType, Dict[str, Any]] + """ + user_id: int = int(args["qq_id"]) + if group_info is None: + group_id = None + else: + group_id: int = int(group_info.group_id) + if group_id <= 0: + raise ValueError("群组ID无效") + if user_id <= 0: + raise ValueError("用户ID无效") + return ( + CommandType.SEND_POKE.value, + { + "group_id": group_id, + "user_id": user_id, + }, + ) + + @staticmethod + def delete_msg_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: + """处理撤回消息命令 + + Args: + args (Dict[str, Any]): 参数字典 + + Returns: + Tuple[CommandType, Dict[str, Any]] + """ + try: + message_id = int(args["message_id"]) + if message_id <= 0: + raise ValueError("消息ID无效") + except KeyError: + raise ValueError("缺少必需参数: message_id") from None + except (ValueError, TypeError) as e: + raise ValueError(f"消息ID无效: {args['message_id']} - {str(e)}") from None + + return ( + CommandType.DELETE_MSG.value, + { + "message_id": message_id, + }, + ) + + @staticmethod + def handle_ai_voice_send_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: + """ + 处理AI语音发送命令的逻辑。 + 并返回 NapCat 兼容的 (action, params) 元组。 + """ + if not group_info or not group_info.group_id: + raise ValueError("AI语音发送命令必须在群聊上下文中使用") + if not args: + raise ValueError("AI语音发送命令缺少参数") + + group_id: int = int(group_info.group_id) + character_id = args.get("character") + text_content = args.get("text") + + if not character_id or not text_content: + raise ValueError(f"AI语音发送命令参数不完整: character='{character_id}', text='{text_content}'") + + return ( + CommandType.AI_VOICE_SEND.value, + { + "group_id": group_id, + "text": text_content, + "character": character_id, + }, + ) + + @staticmethod + def handle_message_like_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: + """ + 处理给消息贴表情的逻辑。 + """ + if not args: + raise ValueError("消息贴表情命令缺少参数") + + message_id = args.get("message_id") + emoji_id = args.get("emoji_id") + if not message_id: + raise ValueError("消息贴表情命令缺少必要参数: message_id") + if not emoji_id: + raise ValueError("消息贴表情命令缺少必要参数: emoji_id") + + message_id = int(message_id) + emoji_id = int(emoji_id) + if message_id <= 0: + raise ValueError("消息ID无效") + if emoji_id <= 0: + raise ValueError("表情ID无效") + + return ( + CommandType.MESSAGE_LIKE.value, + { + "message_id": message_id, + "emoji_id": emoji_id, + "set": True, + }, + ) diff --git a/src/send_handler/send_message_handler.py b/src/send_handler/send_message_handler.py new file mode 100644 index 0000000..308b688 --- /dev/null +++ b/src/send_handler/send_message_handler.py @@ -0,0 +1,188 @@ +from maim_message import Seg, MessageBase +from typing import List, Dict + +from src.logger import logger +from src.config import global_config +from src.utils import get_image_format, convert_image_to_gif + + +class SendMessageHandleClass: + @classmethod + def parse_seg_to_nc_format(cls, message_segment: Seg): + parsed_payload: List = cls.process_seg_recursive(message_segment) + return parsed_payload + + @classmethod + def process_seg_recursive(cls, seg_data: Seg, in_forward: bool = False) -> List: + payload: List = [] + if seg_data.type == "seglist": + if not seg_data.data: + return [] + for seg in seg_data.data: + payload = cls.process_message_by_type(seg, payload, in_forward) + else: + payload = cls.process_message_by_type(seg_data, payload, in_forward) + return payload + + @classmethod + def process_message_by_type(cls, seg: Seg, payload: List, in_forward: bool = False) -> List: + # sourcery skip: for-append-to-extend, reintroduce-else, swap-if-else-branches, use-named-expression + new_payload = payload + if seg.type == "reply": + target_id = seg.data + if target_id == "notice": + return payload + new_payload = cls.build_payload(payload, cls.handle_reply_message(target_id), True) + elif seg.type == "text": + text = seg.data + if not text: + return payload + new_payload = cls.build_payload(payload, cls.handle_text_message(text), False) + elif seg.type == "face": + face_id = seg.data + new_payload = cls.build_payload(payload, cls.handle_native_face_message(face_id), False) + elif seg.type == "image": + image = seg.data + new_payload = cls.build_payload(payload, cls.handle_image_message(image), False) + elif seg.type == "emoji": + emoji = seg.data + new_payload = cls.build_payload(payload, cls.handle_emoji_message(emoji), False) + elif seg.type == "voice": + voice = seg.data + new_payload = cls.build_payload(payload, cls.handle_voice_message(voice), False) + elif seg.type == "voiceurl": + voice_url = seg.data + new_payload = cls.build_payload(payload, cls.handle_voiceurl_message(voice_url), False) + elif seg.type == "music": + song_id = seg.data + new_payload = cls.build_payload(payload, cls.handle_music_message(song_id), False) + elif seg.type == "videourl": + video_url = seg.data + new_payload = cls.build_payload(payload, cls.handle_videourl_message(video_url), False) + elif seg.type == "file": + file_path = seg.data + new_payload = cls.build_payload(payload, cls.handle_file_message(file_path), False) + elif seg.type == "forward" and not in_forward: + forward_message_content: List[Dict] = seg.data + new_payload: List[Dict] = [ + cls.handle_forward_message(MessageBase.from_dict(item)) for item in forward_message_content + ] # 转发消息不能和其他消息一起发送 + return new_payload + + @classmethod + def handle_forward_message(cls, item: MessageBase) -> Dict: + # sourcery skip: remove-unnecessary-else + message_segment: Seg = item.message_segment + if message_segment.type == "id": + return {"type": "node", "data": {"id": message_segment.data}} + else: + user_info = item.message_info.user_info + content = cls.process_seg_recursive(message_segment.data, True) + return { + "type": "node", + "data": {"name": user_info.user_nickname or "QQ用户", "uin": user_info.user_id, "content": content}, + } + + @staticmethod + def build_payload(payload: List, addon: dict, is_reply: bool = False) -> List: + # sourcery skip: for-append-to-extend, merge-list-append, simplify-generator + if is_reply: + temp_list = [] + temp_list.append(addon) + for i in payload: + if i.get("type") == "reply": + logger.debug("检测到多个回复,使用最新的回复") + continue + temp_list.append(i) + return temp_list + else: + payload.append(addon) + return payload + + @staticmethod + def handle_reply_message(id: str) -> dict: + """处理回复消息""" + return {"type": "reply", "data": {"id": id}} + + @staticmethod + def handle_text_message(message: str) -> dict: + """处理文本消息""" + return {"type": "text", "data": {"text": message}} + + @staticmethod + def handle_native_face_message(face_id: int) -> dict: + # sourcery skip: remove-unnecessary-cast + """处理原生表情消息""" + return {"type": "face", "data": {"id": int(face_id)}} + + @staticmethod + def handle_image_message(encoded_image: str) -> dict: + """处理图片消息""" + return { + "type": "image", + "data": { + "file": f"base64://{encoded_image}", + "subtype": 0, + }, + } # base64 编码的图片 + + @staticmethod + def handle_emoji_message(encoded_emoji: str) -> dict: + """处理表情消息""" + encoded_image = encoded_emoji + image_format = get_image_format(encoded_emoji) + if image_format != "gif": + encoded_image = convert_image_to_gif(encoded_emoji) + return { + "type": "image", + "data": { + "file": f"base64://{encoded_image}", + "subtype": 1, + "summary": "[动画表情]", + }, + } + + @staticmethod + def handle_voice_message(encoded_voice: str) -> dict: + """处理语音消息""" + if not global_config.voice.use_tts: + logger.warning("未启用语音消息处理") + return {} + if not encoded_voice: + return {} + return { + "type": "record", + "data": {"file": f"base64://{encoded_voice}"}, + } + + @staticmethod + def handle_voiceurl_message(voice_url: str) -> dict: + """处理语音链接消息""" + return { + "type": "record", + "data": {"file": voice_url}, + } + + @staticmethod + def handle_music_message(song_id: str) -> dict: + """处理音乐消息""" + return { + "type": "music", + "data": {"type": "163", "id": song_id}, + } + + @staticmethod + def handle_videourl_message(video_url: str) -> dict: + """处理视频链接消息""" + return { + "type": "video", + "data": {"file": video_url}, + } + + @staticmethod + def handle_file_message(file_path: str) -> dict: + """处理文件消息""" + return { + "type": "file", + "data": {"file": f"file://{file_path}"}, + }