feat: command

pull/31/head
UnCLAS-Prommer 2025-05-25 15:32:34 +08:00
parent 9a83be8cf0
commit 4a5bf0c50e
6 changed files with 216 additions and 41 deletions

View File

@ -72,6 +72,10 @@ sequenceDiagram
- [ ] 戳回去(?)
- [x] 发送语音
- [x] 使用echo与uuid保证消息顺序
- [x] 执行部分管理员功能
- [x] 禁言别人
- [x] 全体禁言
- [x] 群踢人功能
# 特别鸣谢
特别感谢[@Maple127667](https://github.com/Maple127667)对本项目代码思路的支持

37
command_args.md 100644
View File

@ -0,0 +1,37 @@
# Command Arguments
```python
Seg.type = "command"
```
## 群聊禁言
```python
Seg.data: Dict[str, Any] = {
"name": "GROUP_BAN"
"args": {
"qq_id": "用户QQ号",
"duration": "禁言时长(秒)"
},
}
```
其中群聊ID将会通过Group_Info.group_id自动获取。
## 群聊全体禁言
```python
Seg.data: Dict[str, Any] = {
"name": "GROUP_WHOLE_BAN"
"args": {
"enable": "是否开启全体禁言True/False"
},
}
```
其中群聊ID将会通过Group_Info.group_id自动获取。
`enable`的参数需要为boolean类型True表示开启全体禁言False表示关闭全体禁言。
## 群聊踢人
```python
Seg.data: Dict[str, Any] = {
"name": "GROUP_KICK"
"args": {
"qq_id": "用户QQ号",
},
}
```
其中群聊ID将会通过Group_Info.group_id自动获取。

View File

@ -1,6 +1,6 @@
[project]
name = "MaiBotNapcatAdapter"
version = "0.2.3"
version = "0.2.4"
description = "A MaiBot adapter for Napcat"
[tool.ruff]

View File

@ -1,3 +1,6 @@
from enum import Enum
class MetaEventType:
lifecycle = "lifecycle" # 生命周期
@ -61,3 +64,14 @@ class MessageSentType:
class Group:
normal = "normal"
class CommandType(Enum):
"""命令类型"""
GROUP_BAN = "set_group_ban" # 禁言用户
GROUP_WHOLE_BAN = "set_group_whole_ban" # 群全体禁言
GROUP_KICK = "set_group_kick" # 踢出群聊
def __str__(self) -> str:
return self.value

View File

@ -16,7 +16,7 @@ router = Router(route_config)
async def mmc_start_com():
logger.info("正在连接MaiBot")
router.register_class_handler(send_handler.handle_seg)
router.register_class_handler(send_handler.handle_message)
await router.run()

View File

@ -1,13 +1,6 @@
import json
import websockets as Server
import uuid
from .config import global_config
# 白名单机制不启用
from .message_queue import get_response
from .logger import logger
from maim_message import (
UserInfo,
GroupInfo,
@ -15,7 +8,12 @@ from maim_message import (
BaseMessageInfo,
MessageBase,
)
from typing import Dict, Any, Tuple
from . import CommandType
from .config import global_config
from .message_queue import get_response
from .logger import logger
from .utils import get_image_format, convert_image_to_gif
@ -23,8 +21,20 @@ class SendHandler:
def __init__(self):
self.server_connection: Server.ServerConnection = None
async def handle_seg(self, raw_message_base_dict: dict) -> None:
async def handle_message(self, raw_message_base_dict: dict) -> None:
raw_message_base: MessageBase = MessageBase.from_dict(raw_message_base_dict)
message_segment: Seg = raw_message_base.message_segment
logger.info("接收到来自MaiBot的消息处理中")
if message_segment.type == "command":
return await self.send_command(raw_message_base)
else:
return await self.send_normal_message(raw_message_base)
async def send_normal_message(self, raw_message_base: MessageBase) -> None:
"""
处理普通消息发送
"""
logger.info("处理普通信息中")
message_info: BaseMessageInfo = raw_message_base.message_info
message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info
@ -33,43 +43,77 @@ class SendHandler:
action: str = None
id_name: str = None
processed_message: list = []
logger.info("接收到来自MaiBot的消息处理中")
try:
processed_message = await self.handle_seg_recursive(message_segment)
except Exception as e:
logger.error(f"处理消息时发生错误: {e}")
return
if processed_message:
if group_info and user_info:
logger.debug("发送群聊消息")
target_id = group_info.group_id
action = "send_group_msg"
id_name = "group_id"
elif user_info:
logger.debug("发送私聊消息")
target_id = user_info.user_id
action = "send_private_msg"
id_name = "user_id"
else:
logger.error("无法识别的消息类型")
return
logger.info("尝试发送到napcat")
response = await self.send_message_to_napcat(
action,
{
id_name: target_id,
"message": processed_message,
},
)
if response.get("status") == "ok":
logger.info("消息发送成功")
else:
logger.warning(f"消息发送失败napcat返回{str(response)}")
else:
if not processed_message:
logger.critical("现在暂时不支持解析此回复!")
return None
if group_info and user_info:
logger.debug("发送群聊消息")
target_id = group_info.group_id
action = "send_group_msg"
id_name = "group_id"
elif user_info:
logger.debug("发送私聊消息")
target_id = user_info.user_id
action = "send_private_msg"
id_name = "user_id"
else:
logger.error("无法识别的消息类型")
return
logger.info("尝试发送到napcat")
response = await self.send_message_to_napcat(
action,
{
id_name: target_id,
"message": processed_message,
},
)
if response.get("status") == "ok":
logger.info("消息发送成功")
else:
logger.warning(f"消息发送失败napcat返回{str(response)}")
async def send_command(self, raw_message_base: MessageBase) -> None:
"""
处理命令类
"""
logger.info("处理命令中")
message_info: BaseMessageInfo = raw_message_base.message_info
message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info
seg_data: Dict[str, Any] = message_segment.data
command_name: str = seg_data.get("name")
try:
match command_name:
case CommandType.GROUP_BAN.name:
command, args_dict = self.handle_ban_command(seg_data.get("args"), group_info)
case CommandType.GROUP_WHOLE_BAN.name:
command, args_dict = self.handle_whole_ban_command(seg_data.get("args"), group_info)
case CommandType.GROUP_KICK.name:
command, args_dict = self.handle_kick_command(seg_data.get("args"), group_info)
case _:
logger.error(f"未知命令: {command_name}")
return
except Exception as e:
logger.error(f"处理命令时发生错误: {e}")
return None
if not command or not args_dict:
logger.error("命令或参数缺失")
return None
response = await self.send_message_to_napcat(command, args_dict)
if response.get("status") == "ok":
logger.info(f"命令 {command_name} 执行成功")
else:
logger.warning(f"命令 {command_name} 执行失败napcat返回{str(response)}")
def get_level(self, seg_data: Seg) -> int:
if seg_data.type == "seglist":
return 1 + max(self.get_level(seg) for seg in seg_data.data)
@ -94,15 +138,15 @@ class SendHandler:
if seg.type == "reply":
target_id = seg.data
if target_id == "notice":
return []
return payload
new_payload = self.build_payload(payload, self.handle_reply_message(target_id), True)
elif seg.type == "text":
text = seg.data
if not text:
return []
return payload
new_payload = self.build_payload(payload, self.handle_text_message(text), False)
elif seg.type == "face":
pass
logger.warning("MaiBot 发送了qq原生表情暂时不支持")
elif seg.type == "image":
image = seg.data
new_payload = self.build_payload(payload, self.handle_image_message(image), False)
@ -175,6 +219,82 @@ class SendHandler:
"data": {"file": f"base64://{encoded_voice}"},
}
def handle_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
"""处理封禁命令
Args:
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊
Returns:
Tuple[CommandType, Dict[str, Any]]
"""
duration: int = int(args["duration"])
user_id: int = int(args["qq_id"])
group_id: int = int(group_info.group_id)
if duration <= 0:
raise ValueError("封禁时间必须大于0")
if not user_id or not group_id:
raise ValueError("封禁命令缺少必要参数")
if duration > 2592000:
raise ValueError("封禁时间不能超过30天")
return (
CommandType.GROUP_BAN.value,
{
"group_id": group_id,
"user_id": user_id,
"duration": duration,
},
)
def handle_whole_ban_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
"""处理全体禁言命令
Args:
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊
Returns:
Tuple[CommandType, Dict[str, Any]]
"""
enable = args["enable"]
assert isinstance(enable, bool), "enable参数必须是布尔值"
group_id: int = int(group_info.group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.GROUP_WHOLE_BAN.value,
{
"group_id": group_id,
"enable": enable,
},
)
def handle_kick_command(self, args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]:
"""处理群成员踢出命令
Args:
args (Dict[str, Any]): 参数字典
group_info (GroupInfo): 群聊信息对应目标群聊
Returns:
Tuple[CommandType, Dict[str, Any]]
"""
user_id: int = int(args["qq_id"])
group_id: int = int(group_info.group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
if user_id <= 0:
raise ValueError("用户ID无效")
return (
CommandType.GROUP_KICK.value,
{
"group_id": group_id,
"user_id": user_id,
"reject_add_request": False, # 不拒绝加群请求
},
)
async def send_message_to_napcat(self, action: str, params: dict) -> dict:
request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": action, "params": params, "echo": request_uuid})