第一版发布

pull/2/head
UnCLAS-Prommer 2025-04-06 01:45:17 +08:00
parent 9171a6b3b6
commit 16a9eebb35
13 changed files with 914 additions and 0 deletions

5
.gitignore vendored 100644
View File

@ -0,0 +1,5 @@
**/__pycache__/
.vscode
test
.ruff_cache
config.toml

47
README.md 100644
View File

@ -0,0 +1,47 @@
# MaiBot 与 Napcat 的 Adapter
运行方式:独立/放在MaiBot本体作为插件
# 使用说明
首先napcat开**websocket客户端**设置URL为类似这样`ws://localhost:8095`
然后说明一下配置文件:
```
[Nickname] # 现在没用
nickname = ""
[Napcat_Server] # Napvat连接的ws服务设置
host = "localhost" # Napcat设定的url地址
port = 8095 # Napcat设定的ws端口
[MaiBot_Server] # 连接麦麦的ws服务设置
host = "localhost" # 麦麦在.env文件中设置的url地址
port = 8000 # 麦麦在.env文件中设置的ws端口
[Napcat]
heartbeat = 30 # 与Napcat设置的心跳相同按秒计
[Whitelist] # 白名单功能(未启用)(未实现)
group_list = []
private_list = []
enable_temp = false
```
你需要的就是把template_config.toml
# TO DO List
- [x] 读取自动心跳测试连接
- [x] 接受消息解析
- [x] 文本解析
- [x] 图片解析
- [x] 文本与消息混合解析
- [ ] 链接解析
- [ ] 戳一戳解析
- [ ] 语音解析(?)
- [x] 发送消息
- [x] 发送文本
- [x] 发送图片
- [x] 发送表情包
- [ ] 引用回复(完成但是没测试)
- [ ] 戳回去(?)
- [ ] 发送语音(?)

Binary file not shown.

After

Width:  |  Height:  |  Size: 157 KiB

73
main.py 100644
View File

@ -0,0 +1,73 @@
import asyncio
import sys
import json
import websockets.asyncio.server as Server
from src.logger import logger
from src.recv_handler import recv_handler
from src.send_handler import send_handler
from src.config import global_config
from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
from src.message_queue import recv_queue
async def message_recv(server_connection: Server.ServerConnection):
recv_handler.server_connection = server_connection
send_handler.server_connection = server_connection
# asyncio.create_task(send_handler.test_send())
async for raw_message in server_connection:
logger.debug(raw_message)
decoded_raw_message: dict = json.loads(raw_message)
post_type = decoded_raw_message.get("post_type")
if post_type == "meta_event":
await recv_handler.handle_meta_event(decoded_raw_message)
elif post_type == "message":
await recv_handler.handle_raw_message(decoded_raw_message)
elif post_type == "notice":
pass
elif post_type is None:
recv_queue.put(decoded_raw_message)
async def main():
recv_handler.maibot_router = router
_ = await asyncio.gather(mmc_server(), mmc_start_com())
async def mmc_server():
logger.info("正在启动adapter...")
async with Server.serve(
message_recv, global_config.server_host, global_config.server_port
) as server:
logger.info(
f"Adapter已启动监听地址: ws://{global_config.server_host}:{global_config.server_port}"
)
await server.serve_forever()
async def graceful_shutdown():
try:
logger.info("正在关闭adapter...")
await mmc_stop_com()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Adapter关闭失败: {e}")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...")
loop.run_until_complete(graceful_shutdown())
except Exception as e:
logger.error(f"主程序异常: {str(e)}")
if loop and not loop.is_closed():
loop.run_until_complete(graceful_shutdown())
loop.close()
sys.exit(1)

58
src/__init__.py 100644
View File

@ -0,0 +1,58 @@
class MetaEventType():
lifecycle = "lifecycle" # 生命周期
class Lifecycle:
connect = "connect" # 生命周期 - WebSocket 连接成功
heartbeat = "heartbeat" # 心跳
class MessageType: # 接受消息大类
private = "private" # 私聊消息
class Private:
friend = "friend" # 私聊消息 - 好友
group = "group" # 私聊消息 - 群临时
group_self = "group_self" # 私聊消息 - 群中自身发送
other = "other" # 私聊消息 - 其他
group = "group" # 群聊消息
class Group:
normal = "normal" # 群聊消息 - 普通
anonymous = "anonymous" # 群聊消息 - 匿名消息
notice = "notice" # 群聊消息 - 系统提示
class NoticeType: # 通知事件
friend_recall = "friend_recall" # 私聊消息撤回
group_recall = "group_recall" # 群聊消息撤回
class Notify:
poke = "poke" # 戳一戳
class RealMessageType: # 实际消息分类
text = "text" # 纯文本
face = "face" # qq表情
image = "image" # 图片
record = "record" # 语音
video = "video" # 视频
at = "at" # @某人
rps = "rps" # 猜拳魔法表情
dice = "dice" # 骰子
shake = "shake" # 私聊窗口抖动(只收)
poke = "poke" # 群聊戳一戳
share = "share" # 链接分享json形式
reply = "reply" # 回复消息
forward = "forward" # 转发消息
node = "node" # 转发消息节点
class MessageSentType:
private = "private"
class Private:
friend = "friend"
group = "group"
group = "group"
class Group:
normal = "normal"

63
src/config.py 100644
View File

@ -0,0 +1,63 @@
import os
import sys
import tomli
import shutil
from .logger import logger
from typing import Optional
class Config:
platform: str = "qq"
nickname: Optional[str] = None
server_host: str = "localhost"
server_port: int = 8095
napcat_heartbeat_interval: int = 30
def __init__(self):
self._get_config_path()
def _get_config_path(self):
current_file_path = os.path.abspath(__file__)
src_path = os.path.dirname(current_file_path)
self.root_path = os.path.join(src_path, "..")
self.config_path = os.path.join(self.root_path, "config.toml")
def load_config(self):
include_configs = [
"Nickname",
"Napcat_Server",
"MaiBot_Server",
"Napcat",
]
if os.path.exists(self.config_path):
with open(self.config_path, "rb") as f:
try:
raw_config = tomli.load(f)
except tomli.TOMLDecodeError as e:
logger.critical(
f"配置文件bot_config.toml填写有误请检查第{e.lineno}行第{e.colno}处:{e.msg}"
)
sys.exit(1)
for key in include_configs:
if key not in raw_config:
logger.error(f"配置文件中缺少必需的字段: '{key}'")
sys.exit(1)
self.nickname = raw_config["Nickname"].get("nickname")
self.server_host = raw_config["Napcat_Server"].get("host", "localhost")
self.server_port = raw_config["Napcat_Server"].get("port", 8095)
self.napcat_heartbeat_interval = raw_config["Napcat"].get("interval", 30)
self.mai_host = raw_config["MaiBot_Server"].get("host", "localhost")
self.mai_port = raw_config["MaiBot_Server"].get("port", 8000)
else:
logger.error("配置文件不存在!")
logger.info("正在创建配置文件...")
shutil.copy(
os.path.join(self.root_path, "template", "template_config.toml"),
os.path.join(self.root_path, "config.toml"),
)
logger.info("配置文件创建成功,请修改配置文件后重启程序。")
sys.exit(1)
global_config = Config()
global_config.load_config()

16
src/logger.py 100644
View File

@ -0,0 +1,16 @@
from loguru import logger
import builtins
def handle_output(message: str):
if "连接失败" in message:
logger.error(message)
elif "收到无效的" in message:
logger.warning(message)
elif "检测到平台" in message:
logger.warning(message)
else:
logger.info(message)
builtins.print = handle_output

View File

@ -0,0 +1,9 @@
import queue
import asyncio
recv_queue = queue.Queue()
async def get_response():
while recv_queue.empty():
await asyncio.sleep(0.5)
return recv_queue.get()

View File

@ -0,0 +1,24 @@
from maim_message import Router, RouteConfig, TargetConfig
from .config import global_config
from .logger import logger
from .send_handler import send_handler
route_config = RouteConfig(
route_config={
"qq": TargetConfig(
url=f"ws://{global_config.mai_host}:{global_config.mai_port}/ws",
token=None,
)
}
)
router = Router(route_config)
async def mmc_start_com():
logger.info("正在连接MaiBot")
router.register_class_handler(send_handler.handle_seg)
await router.run()
async def mmc_stop_com():
await router.stop()

354
src/recv_handler.py 100644
View File

@ -0,0 +1,354 @@
from .logger import logger
from .config import global_config
import time
import asyncio
import json
import websockets.asyncio.server as Server
from typing import List
from . import MetaEventType, RealMessageType, MessageType
from maim_message import (
UserInfo,
GroupInfo,
Seg,
BaseMessageInfo,
MessageBase,
TemplateInfo,
FormatInfo,
Router,
)
from .utils import get_group_info, get_member_info, get_image_base64, get_self_info
class RecvHandler:
maibot_router: Router = None
def __init__(self):
self.server_connection: Server.ServerConnection = None
self.interval = global_config.napcat_heartbeat_interval
async def handle_meta_event(self, message: dict) -> None:
event_type = message.get("meta_event_type")
if event_type == MetaEventType.lifecycle:
sub_type = message.get("sub_type")
if sub_type == MetaEventType.Lifecycle.connect:
self_id = message.get("self_id")
self.last_heart_beat = time.time()
logger.info(f"Bot {self_id} 连接成功")
asyncio.create_task(self.check_heartbeat(self_id))
elif event_type == MetaEventType.heartbeat:
if message["status"].get("online") and message["status"].get("good"):
self.last_heart_beat = time.time()
self.interval = message.get("interval") / 1000
else:
self_id = message.get("self_id")
logger.warning(f"Bot {self_id} Napcat 端异常!")
async def check_heartbeat(self, id: int) -> None:
while True:
now_time = time.time()
if now_time - self.last_heart_beat > self.interval + 3:
logger.warning(f"Bot {id} 连接已断开")
break
else:
logger.debug("心跳正常")
await asyncio.sleep(self.interval)
async def handle_raw_message(self, raw_message: dict) -> None:
"""
从Napcat接受的原始消息处理
参数:
raw_message: dict: 原始消息
返回值:
None
"""
message_type: str = raw_message.get("message_type")
message_id: int = raw_message.get("message_id")
message_time: int = raw_message.get("time")
template_info: TemplateInfo = None # 模板信息,暂时为空,等待启用
format_info: FormatInfo = None # 格式化信息,暂时为空,等待启用
if message_type == MessageType.private:
sub_type = raw_message.get("sub_type")
if sub_type == MessageType.Private.friend:
sender_info: dict = raw_message.get("sender")
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=sender_info.get("nickname"),
user_cardname=sender_info.get("card"),
)
# 不存在群信息
group_info: GroupInfo = None
elif sub_type == MessageType.Private.group:
"""
本部分暂时不做支持先放着
"""
logger.warning("群临时消息类型不支持")
return None
sender_info: dict = raw_message.get("sender")
# 由于临时会话中Napcat默认不发送成员昵称所以需要单独获取
fetched_member_info: dict = await get_member_info(
self.server_connection,
raw_message.get("group_id"),
sender_info.get("user_id"),
)
nickname: str = None
if fetched_member_info:
nickname = fetched_member_info.get("nickname")
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=nickname,
user_cardname=None,
)
# -------------------这里需要群信息吗?-------------------
# 获取群聊相关信息在此单独处理group_name因为默认发送的消息中没有
fetched_group_info: dict = get_group_info(
self.server_connection, raw_message.get("group_id")
)
group_name = ""
if fetched_group_info.get("group_name"):
group_name = fetched_group_info.get("group_name")
group_info: GroupInfo = GroupInfo(
platform=global_config.platform,
group_id=raw_message.get("group_id"),
group_name=group_name,
)
else:
logger.warning("私聊消息类型不支持")
return None
elif message_type == MessageType.group:
sub_type = raw_message.get("sub_type")
if sub_type == MessageType.Group.normal:
sender_info: dict = raw_message.get("sender")
# 发送者用户信息
user_info: UserInfo = UserInfo(
platform=global_config.platform,
user_id=sender_info.get("user_id"),
user_nickname=sender_info.get("nickname"),
user_cardname=sender_info.get("card"),
)
# 获取群聊相关信息在此单独处理group_name因为默认发送的消息中没有
fetched_group_info = await get_group_info(
self.server_connection, raw_message.get("group_id")
)
group_name: str = None
if fetched_group_info:
group_name = fetched_group_info.get("group_name")
group_info: GroupInfo = GroupInfo(
platform=global_config.platform,
group_id=raw_message.get("group_id"),
group_name=group_name,
)
else:
logger.warning("群聊消息类型不支持")
return None
# 消息信息
message_info: BaseMessageInfo = BaseMessageInfo(
platform=global_config.platform,
message_id=message_id,
time=message_time,
user_info=user_info,
group_info=group_info,
template_info=template_info,
format_info=format_info,
)
# 处理实际信息
if not raw_message.get("message"):
logger.warning("消息内容为空")
return None
# 获取Seg列表
seg_message: List[Seg] = await self.handle_real_message(raw_message)
if not seg_message:
logger.warning("消息内容为空")
return None
submit_seg: Seg = Seg(
type="seglist",
data=seg_message,
)
# MessageBase创建
message_base: MessageBase = MessageBase(
message_info=message_info,
message_segment=submit_seg,
raw_message=raw_message.get("raw_message"),
)
# 不启用发送消息
await self.message_process(message_base)
logger.debug("我处理!")
async def handle_real_message(self, raw_message: dict) -> List[Seg]:
"""
处理实际消息
参数:
real_message: dict: 实际消息
返回值:
seg_message: list[Seg]: 处理后的消息段列表
"""
real_message: list = raw_message.get("message")
if len(real_message) == 0:
return None
seg_message: List[Seg] = []
for sub_message in real_message:
sub_message: dict
sub_message_type = sub_message.get("type")
match sub_message_type:
case RealMessageType.text:
ret_seg = await self.handle_text_message(sub_message)
seg_message.append(ret_seg)
case RealMessageType.face:
pass
case RealMessageType.image:
ret_seg = await self.handle_image_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
case RealMessageType.record:
logger.warning("不支持语音解析")
pass
case RealMessageType.video:
logger.warning("不支持视频解析")
pass
case RealMessageType.at:
ret_seg = await self.handle_at_message(
sub_message,
raw_message.get("self_id"),
raw_message.get("group_id"),
)
if ret_seg:
seg_message.append(ret_seg)
case RealMessageType.rps:
logger.warning("暂时不支持猜拳魔法表情解析")
pass
case RealMessageType.dice:
logger.warning("暂时不支持筛子表情解析")
pass
case RealMessageType.shake:
# 预计等价于戳一戳
logger.warning("暂时不支持窗口抖动解析")
pass
case RealMessageType.poke:
logger.warning("暂时不支持戳一戳解析")
pass
case RealMessageType.share:
logger.warning("链接分享?啊?你搞我啊?")
pass
case RealMessageType.reply:
logger.warning("暂时不支持回复解析")
pass
case RealMessageType.forward:
forward_message_id = sub_message.get("data").get("id")
payload = json.dumps(
{
"action": "get_forward_msg",
"params": {"message_id": forward_message_id},
}
)
await self.server_connection.send(payload)
response = await self.server_connection.recv()
logger.critical(response)
logger.critical(json.loads(response))
case RealMessageType.node:
logger.warning("不支持转发消息节点解析")
pass
return seg_message
async def handle_text_message(self, raw_message: dict) -> Seg:
"""
处理纯文本信息
参数:
raw_message: dict: 原始消息
返回值:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
plain_text: str = message_data.get("text")
seg_data = Seg(type=RealMessageType.text, data=plain_text)
return seg_data
async def handle_face_message(self) -> None:
"""
处理表情消息
支持未完成
"""
pass
async def handle_image_message(self, raw_message: dict) -> Seg:
"""
处理图片消息与表情包消息
参数:
raw_message: dict: 原始消息
返回值:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
image_base64 = await get_image_base64(message_data.get("url"))
image_sub_type = message_data.get("sub_type")
if not image_base64:
return None
if image_sub_type == 0:
"""这部分认为是图片"""
seg_data = Seg(type="image", data=image_base64)
return seg_data
else:
"""这部分认为是表情包"""
seg_data = Seg(type="emoji", data=image_base64)
return seg_data
async def handle_at_message(
self, raw_message: dict, self_id: int, group_id: int
) -> Seg:
"""
处理at消息
"""
message_data: dict = raw_message.get("data")
if message_data:
qq_id = message_data.get("qq")
if str(self_id) == str(qq_id):
self_info: dict = get_self_info()
if self_info:
return Seg(type="text", data=f"@{self_info.get('nickname')} ")
else:
return None
else:
member_info: dict = get_member_info(
self.server_connection, group_id=group_id, user_id=self_id
)
if member_info:
return Seg(type="text", data=f"@{member_info.get('nickname')} ")
else:
return None
async def handle_poke_message(self) -> None:
pass
async def message_process(self, message_base: MessageBase) -> None:
await self.maibot_router.send_message(message_base)
recv_handler = RecvHandler()

155
src/send_handler.py 100644
View File

@ -0,0 +1,155 @@
import json
import websockets.asyncio.server as Server
# from .config import global_config
# 白名单机制不启用
from .message_queue import get_response
from .logger import logger
from maim_message import (
UserInfo,
GroupInfo,
Seg,
BaseMessageInfo,
MessageBase,
)
class SendHandler:
def __init__(self):
self.server_connection: Server.ServerConnection = None
async def handle_seg(self, raw_message_base_str: str) -> None:
logger.critical(raw_message_base_str)
raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_str)
message_info: BaseMessageInfo = raw_message_base.message_info
message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info
user_info: UserInfo = message_info.user_info
if group_info and user_info:
# 处理群聊消息
# return
processed_message: list = await self.handle_seg_recursive(message_segment)
if processed_message:
response = await self.send_message_to_napcat(
"send_group_msg",
{
"group_id": group_info.group_id,
"message": processed_message,
},
)
if response.get("status") == "ok":
logger.info("消息发送成功")
else:
logger.warning(f"消息发送失败napcat返回{str(response)}")
else:
logger.critical("现在暂时不支持解析此回复!")
return None
elif user_info:
# 处理私聊消息
logger.critical("私聊消息暂时无效")
return None
# processed_message = await self.handle_seg_recursive(message_segment)
else:
logger.error("无法识别的消息类型")
return
def get_level(self, seg_data: Seg) -> int:
if seg_data.type == "seglist":
return 1 + max(self.get_level(seg) for seg in seg_data.data)
else:
return 1
async def handle_seg_recursive(self, seg_data: Seg) -> list:
payload: list = []
if seg_data.type == "seglist":
level = self.get_level(seg_data) # 给以后可能的多层嵌套做准备,此处不使用
for seg in seg_data.data:
payload = self.process_message_by_type(seg, payload)
else:
payload = self.process_message_by_type(seg_data, payload)
return payload
def process_message_by_type(self, seg: Seg, payload: list) -> list:
new_payload = payload
if seg.type == "reply":
target_id = seg.data
new_payload = self.build_payload(
payload, self.handle_reply_message(target_id), True
)
elif seg.type == "text":
text = seg.data
new_payload = self.build_payload(
payload, self.handle_text_message(text), False
)
elif seg.type == "face":
pass
elif seg.type == "image":
image = seg.data
new_payload = self.build_payload(
payload, self.handle_image_message(image), False
)
elif seg.type == "emoji":
emoji = seg.data
new_payload = self.build_payload(
payload, self.handle_emoji_message(emoji), False
)
return new_payload
def build_payload(self, payload: list, addon: dict, is_reply: bool = False) -> list:
"""构建发送的消息体"""
if is_reply:
temp_list = []
temp_list.append(addon)
for i in payload:
temp_list.append(i)
return temp_list
else:
payload.append(addon)
return payload
def handle_reply_message(self, id: str) -> dict:
"""处理回复消息"""
return {"type": "reply", "data": {"id": id}}
def handle_text_message(self, message: str) -> dict:
"""处理文本消息"""
ret = {"type": "text", "data": {"text": message}}
return ret
def handle_image_message(self, encoded_image: str) -> dict:
"""处理图片消息"""
return {
"type": "image",
"data": {"file": f"base64://{encoded_image}", "subtype": 0},
} # base64 编码的图片
def handle_emoji_message(self, encoded_emoji: str) -> dict:
"""处理表情消息"""
return {
"type": "image",
"data": {"file": f"base64://{encoded_emoji}", "subtype": 1},
}
async def test_send(self):
response: dict = await self.send_message_to_napcat(
"send_group_msg",
{
"group_id": 1038831234,
"message": [{"type": "text", "data": {"text": "test"}}],
},
)
if response.get("status") == "ok":
logger.info("消息test发送成功")
else:
logger.warning(f"消息发送失败napcat返回{str(response)}")
async def send_message_to_napcat(self, action: str, params: dict) -> None:
payload = json.dumps({"action": action, "params": params})
await self.server_connection.send(payload)
response = await get_response()
return response
send_handler = SendHandler()

92
src/utils.py 100644
View File

@ -0,0 +1,92 @@
import websockets.asyncio.server as Server
import json
import base64
from .logger import logger
import requests
import ssl
from requests.adapters import HTTPAdapter
from PIL import Image
import io
class SSLAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
"""
tls1.3 不再支持RSA KEY exchangepy3.10 增加TLS的默认安全设置可能导致握手失败
使用 `ssl_context.set_ciphers('DEFAULT')` DEFAULT 老的加密设置
"""
ssl_context = ssl.create_default_context()
ssl_context.set_ciphers("DEFAULT")
ssl_context.check_hostname = False # 避免在请求时 verify=False 设置时报错, 如果设置需要校验证书可去掉该行。
ssl_context.minimum_version = (
ssl.TLSVersion.TLSv1_2
) # 最小版本设置成1.2 可去掉低版本的警告
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2 # 最大版本设置成1.2
kwargs["ssl_context"] = ssl_context
return super().init_poolmanager(*args, **kwargs)
async def get_group_info(websocket: Server.ServerConnection, group_id: int) -> dict:
"""
获取群相关信息
返回值需要处理可能为空的情况
"""
payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}})
await websocket.send(payload)
socket_response = await websocket.recv()
logger.debug(socket_response)
return json.loads(socket_response).get("data")
async def get_member_info(
websocket: Server.ServerConnection, group_id: int, user_id: int
) -> dict:
"""
获取群成员信息
返回值需要处理可能为空的情况
"""
payload = json.dumps(
{
"action": "get_group_member_info",
"params": {"group_id": group_id, "user_id": user_id, "no_cache": True},
}
)
await websocket.send(payload)
socket_response = await websocket.recv()
logger.debug(socket_response)
return json.loads(socket_response).get("data")
async def get_image_base64(url: str) -> str:
"""获取图片/表情包的Base64"""
try:
sess = requests.session()
sess.mount("https://", SSLAdapter()) # 将上面定义的SSLAdapter 应用起来
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
response = sess.get(url, headers=headers, timeout=10, verify=True)
response.raise_for_status()
image_bytes = response.content
return base64.b64encode(image_bytes).decode("utf-8")
except Exception as e:
logger.error(f"图片下载失败: {str(e)}")
raise
async def get_self_info(websocket: Server.ServerConnection) -> str:
"""
获取自身信息
"""
payload = json.dumps({"action": "get_login_info", "params": {}})
await websocket.send(payload)
response = await websocket.recv()
logger.debug(response)
return json.loads(response).get("data")
async def get_image_format(raw_data: str) -> str:
image_bytes = base64.b64decode(raw_data)
return Image.open(io.BytesIO(image_bytes)).format.lower()

View File

@ -0,0 +1,18 @@
[Nickname] # 现在没用
nickname = ""
[Napcat_Server] # Napvat连接的ws服务设置
host = "localhost" # Napcat设定的url地址
port = 8095 # Napcat设定的ws端口
[MaiBot_Server] # 连接麦麦的ws服务设置
host = "localhost" # 麦麦在.env文件中设置的url地址
port = 8000 # 麦麦在.env文件中设置的ws端口
[Napcat]
heartbeat = 30 # 与Napcat设置的心跳相同按秒计
[Whitelist] # 白名单功能(未启用)(未实现)
group_list = []
private_list = []
enable_temp = false