diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a8b468a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +**/__pycache__/ +.vscode +test +.ruff_cache +config.toml \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b192bad --- /dev/null +++ b/README.md @@ -0,0 +1,47 @@ +# MaiBot 与 Napcat 的 Adapter +运行方式:独立/放在MaiBot本体作为插件 + +# 使用说明 +首先,napcat开**websocket客户端**,设置URL为类似这样:`ws://localhost:8095` + +然后说明一下配置文件: + +``` +[Nickname] # 现在没用 +nickname = "" + +[Napcat_Server] # Napvat连接的ws服务设置 +host = "localhost" # Napcat设定的url地址 +port = 8095 # Napcat设定的ws端口 + +[MaiBot_Server] # 连接麦麦的ws服务设置 +host = "localhost" # 麦麦在.env文件中设置的url地址 +port = 8000 # 麦麦在.env文件中设置的ws端口 + +[Napcat] +heartbeat = 30 # 与Napcat设置的心跳相同(按秒计) + +[Whitelist] # 白名单功能(未启用)(未实现) +group_list = [] +private_list = [] +enable_temp = false +``` + +你需要的就是把template_config.toml + +# TO DO List +- [x] 读取自动心跳测试连接 +- [x] 接受消息解析 + - [x] 文本解析 + - [x] 图片解析 + - [x] 文本与消息混合解析 + - [ ] 链接解析 + - [ ] 戳一戳解析 + - [ ] 语音解析(?) +- [x] 发送消息 + - [x] 发送文本 + - [x] 发送图片 + - [x] 发送表情包 + - [ ] 引用回复(完成但是没测试) + - [ ] 戳回去(?) + - [ ] 发送语音(?) \ No newline at end of file diff --git a/docs/36bd6c6d15c0fa7ece5b856d0e51ebe5.jpg b/docs/36bd6c6d15c0fa7ece5b856d0e51ebe5.jpg new file mode 100644 index 0000000..a597338 Binary files /dev/null and b/docs/36bd6c6d15c0fa7ece5b856d0e51ebe5.jpg differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..26fff8f --- /dev/null +++ b/main.py @@ -0,0 +1,73 @@ +import asyncio +import sys +import json +import websockets.asyncio.server as Server +from src.logger import logger +from src.recv_handler import recv_handler +from src.send_handler import send_handler +from src.config import global_config +from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router +from src.message_queue import recv_queue + + +async def message_recv(server_connection: Server.ServerConnection): + recv_handler.server_connection = server_connection + send_handler.server_connection = server_connection + # asyncio.create_task(send_handler.test_send()) + async for raw_message in server_connection: + logger.debug(raw_message) + decoded_raw_message: dict = json.loads(raw_message) + post_type = decoded_raw_message.get("post_type") + if post_type == "meta_event": + await recv_handler.handle_meta_event(decoded_raw_message) + elif post_type == "message": + await recv_handler.handle_raw_message(decoded_raw_message) + elif post_type == "notice": + pass + elif post_type is None: + recv_queue.put(decoded_raw_message) + + +async def main(): + recv_handler.maibot_router = router + _ = await asyncio.gather(mmc_server(), mmc_start_com()) + + +async def mmc_server(): + logger.info("正在启动adapter...") + async with Server.serve( + message_recv, global_config.server_host, global_config.server_port + ) as server: + logger.info( + f"Adapter已启动,监听地址: ws://{global_config.server_host}:{global_config.server_port}" + ) + await server.serve_forever() + + +async def graceful_shutdown(): + try: + logger.info("正在关闭adapter...") + await mmc_stop_com() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + except Exception as e: + logger.error(f"Adapter关闭失败: {e}") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + logger.warning("收到中断信号,正在优雅关闭...") + loop.run_until_complete(graceful_shutdown()) + except Exception as e: + logger.error(f"主程序异常: {str(e)}") + if loop and not loop.is_closed(): + loop.run_until_complete(graceful_shutdown()) + loop.close() + sys.exit(1) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..47beb43 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,58 @@ +class MetaEventType(): + lifecycle = "lifecycle" # 生命周期 + + class Lifecycle: + connect = "connect" # 生命周期 - WebSocket 连接成功 + + heartbeat = "heartbeat" # 心跳 + + +class MessageType: # 接受消息大类 + private = "private" # 私聊消息 + + class Private: + friend = "friend" # 私聊消息 - 好友 + group = "group" # 私聊消息 - 群临时 + group_self = "group_self" # 私聊消息 - 群中自身发送 + other = "other" # 私聊消息 - 其他 + + group = "group" # 群聊消息 + + class Group: + normal = "normal" # 群聊消息 - 普通 + anonymous = "anonymous" # 群聊消息 - 匿名消息 + notice = "notice" # 群聊消息 - 系统提示 + + +class NoticeType: # 通知事件 + friend_recall = "friend_recall" # 私聊消息撤回 + group_recall = "group_recall" # 群聊消息撤回 + + class Notify: + poke = "poke" # 戳一戳 + + +class RealMessageType: # 实际消息分类 + text = "text" # 纯文本 + face = "face" # qq表情 + image = "image" # 图片 + record = "record" # 语音 + video = "video" # 视频 + at = "at" # @某人 + rps = "rps" # 猜拳魔法表情 + dice = "dice" # 骰子 + shake = "shake" # 私聊窗口抖动(只收) + poke = "poke" # 群聊戳一戳 + share = "share" # 链接分享(json形式) + reply = "reply" # 回复消息 + forward = "forward" # 转发消息 + node = "node" # 转发消息节点 + +class MessageSentType: + private = "private" + class Private: + friend = "friend" + group = "group" + group = "group" + class Group: + normal = "normal" \ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..970143e --- /dev/null +++ b/src/config.py @@ -0,0 +1,63 @@ +import os +import sys +import tomli +import shutil +from .logger import logger +from typing import Optional + + +class Config: + platform: str = "qq" + nickname: Optional[str] = None + server_host: str = "localhost" + server_port: int = 8095 + napcat_heartbeat_interval: int = 30 + + def __init__(self): + self._get_config_path() + + def _get_config_path(self): + current_file_path = os.path.abspath(__file__) + src_path = os.path.dirname(current_file_path) + self.root_path = os.path.join(src_path, "..") + self.config_path = os.path.join(self.root_path, "config.toml") + + def load_config(self): + include_configs = [ + "Nickname", + "Napcat_Server", + "MaiBot_Server", + "Napcat", + ] + if os.path.exists(self.config_path): + with open(self.config_path, "rb") as f: + try: + raw_config = tomli.load(f) + except tomli.TOMLDecodeError as e: + logger.critical( + f"配置文件bot_config.toml填写有误,请检查第{e.lineno}行第{e.colno}处:{e.msg}" + ) + sys.exit(1) + for key in include_configs: + if key not in raw_config: + logger.error(f"配置文件中缺少必需的字段: '{key}'") + sys.exit(1) + self.nickname = raw_config["Nickname"].get("nickname") + self.server_host = raw_config["Napcat_Server"].get("host", "localhost") + self.server_port = raw_config["Napcat_Server"].get("port", 8095) + self.napcat_heartbeat_interval = raw_config["Napcat"].get("interval", 30) + self.mai_host = raw_config["MaiBot_Server"].get("host", "localhost") + self.mai_port = raw_config["MaiBot_Server"].get("port", 8000) + else: + logger.error("配置文件不存在!") + logger.info("正在创建配置文件...") + shutil.copy( + os.path.join(self.root_path, "template", "template_config.toml"), + os.path.join(self.root_path, "config.toml"), + ) + logger.info("配置文件创建成功,请修改配置文件后重启程序。") + sys.exit(1) + + +global_config = Config() +global_config.load_config() diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..854f5a2 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,16 @@ +from loguru import logger +import builtins + + +def handle_output(message: str): + if "连接失败" in message: + logger.error(message) + elif "收到无效的" in message: + logger.warning(message) + elif "检测到平台" in message: + logger.warning(message) + else: + logger.info(message) + + +builtins.print = handle_output diff --git a/src/message_queue.py b/src/message_queue.py new file mode 100644 index 0000000..cca41e1 --- /dev/null +++ b/src/message_queue.py @@ -0,0 +1,9 @@ +import queue +import asyncio + +recv_queue = queue.Queue() + +async def get_response(): + while recv_queue.empty(): + await asyncio.sleep(0.5) + return recv_queue.get() diff --git a/src/mmc_com_layer.py b/src/mmc_com_layer.py new file mode 100644 index 0000000..b04b0f9 --- /dev/null +++ b/src/mmc_com_layer.py @@ -0,0 +1,24 @@ +from maim_message import Router, RouteConfig, TargetConfig +from .config import global_config +from .logger import logger +from .send_handler import send_handler + +route_config = RouteConfig( + route_config={ + "qq": TargetConfig( + url=f"ws://{global_config.mai_host}:{global_config.mai_port}/ws", + token=None, + ) + } +) +router = Router(route_config) + + +async def mmc_start_com(): + logger.info("正在连接MaiBot") + router.register_class_handler(send_handler.handle_seg) + await router.run() + + +async def mmc_stop_com(): + await router.stop() diff --git a/src/recv_handler.py b/src/recv_handler.py new file mode 100644 index 0000000..0ecc057 --- /dev/null +++ b/src/recv_handler.py @@ -0,0 +1,354 @@ +from .logger import logger +from .config import global_config +import time +import asyncio +import json +import websockets.asyncio.server as Server +from typing import List + +from . import MetaEventType, RealMessageType, MessageType +from maim_message import ( + UserInfo, + GroupInfo, + Seg, + BaseMessageInfo, + MessageBase, + TemplateInfo, + FormatInfo, + Router, +) + +from .utils import get_group_info, get_member_info, get_image_base64, get_self_info + + +class RecvHandler: + maibot_router: Router = None + + def __init__(self): + self.server_connection: Server.ServerConnection = None + self.interval = global_config.napcat_heartbeat_interval + + async def handle_meta_event(self, message: dict) -> None: + event_type = message.get("meta_event_type") + if event_type == MetaEventType.lifecycle: + sub_type = message.get("sub_type") + if sub_type == MetaEventType.Lifecycle.connect: + self_id = message.get("self_id") + self.last_heart_beat = time.time() + logger.info(f"Bot {self_id} 连接成功") + asyncio.create_task(self.check_heartbeat(self_id)) + elif event_type == MetaEventType.heartbeat: + if message["status"].get("online") and message["status"].get("good"): + self.last_heart_beat = time.time() + self.interval = message.get("interval") / 1000 + else: + self_id = message.get("self_id") + logger.warning(f"Bot {self_id} Napcat 端异常!") + + async def check_heartbeat(self, id: int) -> None: + while True: + now_time = time.time() + if now_time - self.last_heart_beat > self.interval + 3: + logger.warning(f"Bot {id} 连接已断开") + break + else: + logger.debug("心跳正常") + await asyncio.sleep(self.interval) + + async def handle_raw_message(self, raw_message: dict) -> None: + """ + 从Napcat接受的原始消息处理 + + 参数: + raw_message: dict: 原始消息 + 返回值: + None + """ + message_type: str = raw_message.get("message_type") + message_id: int = raw_message.get("message_id") + message_time: int = raw_message.get("time") + + template_info: TemplateInfo = None # 模板信息,暂时为空,等待启用 + format_info: FormatInfo = None # 格式化信息,暂时为空,等待启用 + + if message_type == MessageType.private: + sub_type = raw_message.get("sub_type") + if sub_type == MessageType.Private.friend: + sender_info: dict = raw_message.get("sender") + + # 发送者用户信息 + user_info: UserInfo = UserInfo( + platform=global_config.platform, + user_id=sender_info.get("user_id"), + user_nickname=sender_info.get("nickname"), + user_cardname=sender_info.get("card"), + ) + + # 不存在群信息 + group_info: GroupInfo = None + elif sub_type == MessageType.Private.group: + """ + 本部分暂时不做支持,先放着 + """ + logger.warning("群临时消息类型不支持") + return None + + sender_info: dict = raw_message.get("sender") + + # 由于临时会话中,Napcat默认不发送成员昵称,所以需要单独获取 + fetched_member_info: dict = await get_member_info( + self.server_connection, + raw_message.get("group_id"), + sender_info.get("user_id"), + ) + nickname: str = None + if fetched_member_info: + nickname = fetched_member_info.get("nickname") + + # 发送者用户信息 + user_info: UserInfo = UserInfo( + platform=global_config.platform, + user_id=sender_info.get("user_id"), + user_nickname=nickname, + user_cardname=None, + ) + + # -------------------这里需要群信息吗?------------------- + + # 获取群聊相关信息,在此单独处理group_name,因为默认发送的消息中没有 + fetched_group_info: dict = get_group_info( + self.server_connection, raw_message.get("group_id") + ) + group_name = "" + if fetched_group_info.get("group_name"): + group_name = fetched_group_info.get("group_name") + + group_info: GroupInfo = GroupInfo( + platform=global_config.platform, + group_id=raw_message.get("group_id"), + group_name=group_name, + ) + + else: + logger.warning("私聊消息类型不支持") + return None + elif message_type == MessageType.group: + sub_type = raw_message.get("sub_type") + if sub_type == MessageType.Group.normal: + sender_info: dict = raw_message.get("sender") + + # 发送者用户信息 + user_info: UserInfo = UserInfo( + platform=global_config.platform, + user_id=sender_info.get("user_id"), + user_nickname=sender_info.get("nickname"), + user_cardname=sender_info.get("card"), + ) + + # 获取群聊相关信息,在此单独处理group_name,因为默认发送的消息中没有 + fetched_group_info = await get_group_info( + self.server_connection, raw_message.get("group_id") + ) + group_name: str = None + if fetched_group_info: + group_name = fetched_group_info.get("group_name") + + group_info: GroupInfo = GroupInfo( + platform=global_config.platform, + group_id=raw_message.get("group_id"), + group_name=group_name, + ) + + else: + logger.warning("群聊消息类型不支持") + return None + + # 消息信息 + message_info: BaseMessageInfo = BaseMessageInfo( + platform=global_config.platform, + message_id=message_id, + time=message_time, + user_info=user_info, + group_info=group_info, + template_info=template_info, + format_info=format_info, + ) + + # 处理实际信息 + if not raw_message.get("message"): + logger.warning("消息内容为空") + return None + + # 获取Seg列表 + seg_message: List[Seg] = await self.handle_real_message(raw_message) + if not seg_message: + logger.warning("消息内容为空") + return None + submit_seg: Seg = Seg( + type="seglist", + data=seg_message, + ) + # MessageBase创建 + message_base: MessageBase = MessageBase( + message_info=message_info, + message_segment=submit_seg, + raw_message=raw_message.get("raw_message"), + ) + # 不启用发送消息 + await self.message_process(message_base) + + logger.debug("我处理!") + + async def handle_real_message(self, raw_message: dict) -> List[Seg]: + """ + 处理实际消息 + + 参数: + real_message: dict: 实际消息 + 返回值: + seg_message: list[Seg]: 处理后的消息段列表 + """ + real_message: list = raw_message.get("message") + if len(real_message) == 0: + return None + seg_message: List[Seg] = [] + for sub_message in real_message: + sub_message: dict + sub_message_type = sub_message.get("type") + match sub_message_type: + case RealMessageType.text: + ret_seg = await self.handle_text_message(sub_message) + seg_message.append(ret_seg) + case RealMessageType.face: + pass + case RealMessageType.image: + ret_seg = await self.handle_image_message(sub_message) + if ret_seg: + seg_message.append(ret_seg) + case RealMessageType.record: + logger.warning("不支持语音解析") + pass + case RealMessageType.video: + logger.warning("不支持视频解析") + pass + case RealMessageType.at: + ret_seg = await self.handle_at_message( + sub_message, + raw_message.get("self_id"), + raw_message.get("group_id"), + ) + if ret_seg: + seg_message.append(ret_seg) + case RealMessageType.rps: + logger.warning("暂时不支持猜拳魔法表情解析") + pass + case RealMessageType.dice: + logger.warning("暂时不支持筛子表情解析") + pass + case RealMessageType.shake: + # 预计等价于戳一戳 + logger.warning("暂时不支持窗口抖动解析") + pass + case RealMessageType.poke: + logger.warning("暂时不支持戳一戳解析") + pass + case RealMessageType.share: + logger.warning("链接分享?啊?你搞我啊?") + pass + case RealMessageType.reply: + logger.warning("暂时不支持回复解析") + pass + case RealMessageType.forward: + forward_message_id = sub_message.get("data").get("id") + payload = json.dumps( + { + "action": "get_forward_msg", + "params": {"message_id": forward_message_id}, + } + ) + await self.server_connection.send(payload) + response = await self.server_connection.recv() + logger.critical(response) + logger.critical(json.loads(response)) + case RealMessageType.node: + logger.warning("不支持转发消息节点解析") + pass + return seg_message + + async def handle_text_message(self, raw_message: dict) -> Seg: + """ + 处理纯文本信息 + + 参数: + raw_message: dict: 原始消息 + 返回值: + seg_data: Seg: 处理后的消息段 + """ + message_data: dict = raw_message.get("data") + plain_text: str = message_data.get("text") + seg_data = Seg(type=RealMessageType.text, data=plain_text) + return seg_data + + async def handle_face_message(self) -> None: + """ + 处理表情消息 + + 支持未完成 + """ + pass + + async def handle_image_message(self, raw_message: dict) -> Seg: + """ + 处理图片消息与表情包消息 + + 参数: + raw_message: dict: 原始消息 + 返回值: + seg_data: Seg: 处理后的消息段 + """ + message_data: dict = raw_message.get("data") + image_base64 = await get_image_base64(message_data.get("url")) + image_sub_type = message_data.get("sub_type") + if not image_base64: + return None + if image_sub_type == 0: + """这部分认为是图片""" + seg_data = Seg(type="image", data=image_base64) + return seg_data + else: + """这部分认为是表情包""" + seg_data = Seg(type="emoji", data=image_base64) + return seg_data + + async def handle_at_message( + self, raw_message: dict, self_id: int, group_id: int + ) -> Seg: + """ + 处理at消息 + """ + message_data: dict = raw_message.get("data") + if message_data: + qq_id = message_data.get("qq") + if str(self_id) == str(qq_id): + self_info: dict = get_self_info() + if self_info: + return Seg(type="text", data=f"@{self_info.get('nickname')} ") + else: + return None + else: + member_info: dict = get_member_info( + self.server_connection, group_id=group_id, user_id=self_id + ) + if member_info: + return Seg(type="text", data=f"@{member_info.get('nickname')} ") + else: + return None + + async def handle_poke_message(self) -> None: + pass + + async def message_process(self, message_base: MessageBase) -> None: + await self.maibot_router.send_message(message_base) + + +recv_handler = RecvHandler() diff --git a/src/send_handler.py b/src/send_handler.py new file mode 100644 index 0000000..208dabb --- /dev/null +++ b/src/send_handler.py @@ -0,0 +1,155 @@ +import json +import websockets.asyncio.server as Server + +# from .config import global_config +# 白名单机制不启用 +from .message_queue import get_response +from .logger import logger + +from maim_message import ( + UserInfo, + GroupInfo, + Seg, + BaseMessageInfo, + MessageBase, +) + + +class SendHandler: + def __init__(self): + self.server_connection: Server.ServerConnection = None + + async def handle_seg(self, raw_message_base_str: str) -> None: + logger.critical(raw_message_base_str) + raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_str) + message_info: BaseMessageInfo = raw_message_base.message_info + message_segment: Seg = raw_message_base.message_segment + group_info: GroupInfo = message_info.group_info + user_info: UserInfo = message_info.user_info + + if group_info and user_info: + # 处理群聊消息 + # return + processed_message: list = await self.handle_seg_recursive(message_segment) + if processed_message: + response = await self.send_message_to_napcat( + "send_group_msg", + { + "group_id": group_info.group_id, + "message": processed_message, + }, + ) + if response.get("status") == "ok": + logger.info("消息发送成功") + else: + logger.warning(f"消息发送失败,napcat返回:{str(response)}") + else: + logger.critical("现在暂时不支持解析此回复!") + return None + elif user_info: + # 处理私聊消息 + logger.critical("私聊消息暂时无效") + return None + # processed_message = await self.handle_seg_recursive(message_segment) + else: + logger.error("无法识别的消息类型") + return + + def get_level(self, seg_data: Seg) -> int: + if seg_data.type == "seglist": + return 1 + max(self.get_level(seg) for seg in seg_data.data) + else: + return 1 + + async def handle_seg_recursive(self, seg_data: Seg) -> list: + payload: list = [] + if seg_data.type == "seglist": + level = self.get_level(seg_data) # 给以后可能的多层嵌套做准备,此处不使用 + for seg in seg_data.data: + payload = self.process_message_by_type(seg, payload) + else: + payload = self.process_message_by_type(seg_data, payload) + return payload + + def process_message_by_type(self, seg: Seg, payload: list) -> list: + new_payload = payload + if seg.type == "reply": + target_id = seg.data + new_payload = self.build_payload( + payload, self.handle_reply_message(target_id), True + ) + elif seg.type == "text": + text = seg.data + new_payload = self.build_payload( + payload, self.handle_text_message(text), False + ) + elif seg.type == "face": + pass + elif seg.type == "image": + image = seg.data + new_payload = self.build_payload( + payload, self.handle_image_message(image), False + ) + elif seg.type == "emoji": + emoji = seg.data + new_payload = self.build_payload( + payload, self.handle_emoji_message(emoji), False + ) + return new_payload + + def build_payload(self, payload: list, addon: dict, is_reply: bool = False) -> list: + """构建发送的消息体""" + if is_reply: + temp_list = [] + temp_list.append(addon) + for i in payload: + temp_list.append(i) + return temp_list + else: + payload.append(addon) + return payload + + def handle_reply_message(self, id: str) -> dict: + """处理回复消息""" + return {"type": "reply", "data": {"id": id}} + + def handle_text_message(self, message: str) -> dict: + """处理文本消息""" + ret = {"type": "text", "data": {"text": message}} + return ret + + def handle_image_message(self, encoded_image: str) -> dict: + """处理图片消息""" + return { + "type": "image", + "data": {"file": f"base64://{encoded_image}", "subtype": 0}, + } # base64 编码的图片 + + def handle_emoji_message(self, encoded_emoji: str) -> dict: + """处理表情消息""" + return { + "type": "image", + "data": {"file": f"base64://{encoded_emoji}", "subtype": 1}, + } + + async def test_send(self): + response: dict = await self.send_message_to_napcat( + "send_group_msg", + { + "group_id": 1038831234, + "message": [{"type": "text", "data": {"text": "test"}}], + }, + ) + if response.get("status") == "ok": + logger.info("消息test发送成功") + else: + logger.warning(f"消息发送失败,napcat返回:{str(response)}") + + async def send_message_to_napcat(self, action: str, params: dict) -> None: + payload = json.dumps({"action": action, "params": params}) + await self.server_connection.send(payload) + response = await get_response() + return response + + +send_handler = SendHandler() diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..86f4314 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,92 @@ +import websockets.asyncio.server as Server +import json +import base64 +from .logger import logger + +import requests +import ssl +from requests.adapters import HTTPAdapter + +from PIL import Image +import io + +class SSLAdapter(HTTPAdapter): + def init_poolmanager(self, *args, **kwargs): + """ + tls1.3 不再支持RSA KEY exchange,py3.10 增加TLS的默认安全设置。可能导致握手失败。 + 使用 `ssl_context.set_ciphers('DEFAULT')` DEFAULT 老的加密设置。 + """ + ssl_context = ssl.create_default_context() + ssl_context.set_ciphers("DEFAULT") + ssl_context.check_hostname = False # 避免在请求时 verify=False 设置时报错, 如果设置需要校验证书可去掉该行。 + ssl_context.minimum_version = ( + ssl.TLSVersion.TLSv1_2 + ) # 最小版本设置成1.2 可去掉低版本的警告 + ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2 # 最大版本设置成1.2 + kwargs["ssl_context"] = ssl_context + return super().init_poolmanager(*args, **kwargs) + + +async def get_group_info(websocket: Server.ServerConnection, group_id: int) -> dict: + """ + 获取群相关信息 + + 返回值需要处理可能为空的情况 + """ + payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}}) + await websocket.send(payload) + socket_response = await websocket.recv() + logger.debug(socket_response) + return json.loads(socket_response).get("data") + + +async def get_member_info( + websocket: Server.ServerConnection, group_id: int, user_id: int +) -> dict: + """ + 获取群成员信息 + + 返回值需要处理可能为空的情况 + """ + payload = json.dumps( + { + "action": "get_group_member_info", + "params": {"group_id": group_id, "user_id": user_id, "no_cache": True}, + } + ) + await websocket.send(payload) + socket_response = await websocket.recv() + logger.debug(socket_response) + return json.loads(socket_response).get("data") + + +async def get_image_base64(url: str) -> str: + """获取图片/表情包的Base64""" + try: + sess = requests.session() + sess.mount("https://", SSLAdapter()) # 将上面定义的SSLAdapter 应用起来 + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + response = sess.get(url, headers=headers, timeout=10, verify=True) + response.raise_for_status() + image_bytes = response.content + return base64.b64encode(image_bytes).decode("utf-8") + except Exception as e: + logger.error(f"图片下载失败: {str(e)}") + raise + + +async def get_self_info(websocket: Server.ServerConnection) -> str: + """ + 获取自身信息 + """ + payload = json.dumps({"action": "get_login_info", "params": {}}) + await websocket.send(payload) + response = await websocket.recv() + logger.debug(response) + return json.loads(response).get("data") + +async def get_image_format(raw_data: str) -> str: + image_bytes = base64.b64decode(raw_data) + return Image.open(io.BytesIO(image_bytes)).format.lower() \ No newline at end of file diff --git a/template/template_config.toml b/template/template_config.toml new file mode 100644 index 0000000..3a90d69 --- /dev/null +++ b/template/template_config.toml @@ -0,0 +1,18 @@ +[Nickname] # 现在没用 +nickname = "" + +[Napcat_Server] # Napvat连接的ws服务设置 +host = "localhost" # Napcat设定的url地址 +port = 8095 # Napcat设定的ws端口 + +[MaiBot_Server] # 连接麦麦的ws服务设置 +host = "localhost" # 麦麦在.env文件中设置的url地址 +port = 8000 # 麦麦在.env文件中设置的ws端口 + +[Napcat] +heartbeat = 30 # 与Napcat设置的心跳相同(按秒计) + +[Whitelist] # 白名单功能(未启用)(未实现) +group_list = [] +private_list = [] +enable_temp = false