Merge remote-tracking branch 'remote/main'

- 添加配置文件监控和配置变更时自动重启WebSocket服务器功能
- 修复main.py中router引用问题

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
pull/78/head
tcmofashi 2026-01-16 07:27:01 +00:00
commit ce8f05361d
21 changed files with 2776 additions and 171 deletions

4
.gitignore vendored
View File

@ -39,6 +39,7 @@ share/python-wheels/
.installed.cfg .installed.cfg
*.egg *.egg
MANIFEST MANIFEST
dev/
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template
@ -149,6 +150,7 @@ venv/
ENV/ ENV/
env.bak/ env.bak/
venv.bak/ venv.bak/
uv.lock
# Spyder project settings # Spyder project settings
.spyderproject .spyderproject
@ -276,3 +278,5 @@ test
data/NapcatAdapter.db data/NapcatAdapter.db
data/NapcatAdapter.db-shm data/NapcatAdapter.db-shm
data/NapcatAdapter.db-wal data/NapcatAdapter.db-wal
uv.lock

View File

@ -1,8 +1,28 @@
# Command Arguments # Command Arguments
```python ```python
Seg.type = "command" Seg.type = "command"
``` ```
## 群聊禁言
所有命令执行后都会通过自定义消息类型 `command_response` 返回响应,格式如下:
```python
{
"command_name": "命令名称",
"success": True/False, # 是否执行成功
"timestamp": 1234567890.123, # 时间戳
"data": {...}, # 返回数据(成功时)
"error": "错误信息" # 错误信息(失败时)
}
```
插件需要注册 `command_response` 自定义消息处理器来接收命令响应。
---
## 操作类命令
### 群聊禁言
```python ```python
Seg.data: Dict[str, Any] = { Seg.data: Dict[str, Any] = {
"name": "GROUP_BAN", "name": "GROUP_BAN",
@ -15,7 +35,8 @@ Seg.data: Dict[str, Any] = {
其中群聊ID将会通过Group_Info.group_id自动获取。 其中群聊ID将会通过Group_Info.group_id自动获取。
**当`duration`为 0 时相当于解除禁言。** **当`duration`为 0 时相当于解除禁言。**
## 群聊全体禁言
### 群聊全体禁言
```python ```python
Seg.data: Dict[str, Any] = { Seg.data: Dict[str, Any] = {
"name": "GROUP_WHOLE_BAN", "name": "GROUP_WHOLE_BAN",
@ -27,18 +48,36 @@ Seg.data: Dict[str, Any] = {
其中群聊ID将会通过Group_Info.group_id自动获取。 其中群聊ID将会通过Group_Info.group_id自动获取。
`enable`的参数需要为boolean类型True表示开启全体禁言False表示关闭全体禁言。 `enable`的参数需要为boolean类型True表示开启全体禁言False表示关闭全体禁言。
## 群聊踢人
### 群聊踢人
将指定成员从群聊中踢出,可选拉黑。
```python ```python
Seg.data: Dict[str, Any] = { Seg.data: Dict[str, Any] = {
"name": "GROUP_KICK", "name": "GROUP_KICK",
"args": { "args": {
"qq_id": "用户QQ号", "group_id": 123456789, # 可选,如果在群聊上下文中可从 group_info 自动获取
"user_id": 12345678, # 必需用户QQ号
"reject_add_request": False # 可选,是否群拉黑,默认 False
}, },
} }
``` ```
其中群聊ID将会通过Group_Info.group_id自动获取。
## 戳一戳 ### 批量踢出群成员
批量将多个成员从群聊中踢出,可选拉黑。
```python
Seg.data: Dict[str, Any] = {
"name": "GROUP_KICK_MEMBERS",
"args": {
"group_id": 123456789, # 可选,如果在群聊上下文中可从 group_info 自动获取
"user_id": [12345678, 87654321], # 必需用户QQ号数组
"reject_add_request": False # 可选,是否群拉黑,默认 False
},
}
```
### 戳一戳
```python ```python
Seg.data: Dict[str, Any] = { Seg.data: Dict[str, Any] = {
"name": "SEND_POKE", "name": "SEND_POKE",
@ -48,7 +87,7 @@ Seg.data: Dict[str, Any] = {
} }
``` ```
## 撤回消息 ### 撤回消息
```python ```python
Seg.data: Dict[str, Any] = { Seg.data: Dict[str, Any] = {
"name": "DELETE_MSG", "name": "DELETE_MSG",
@ -58,3 +97,380 @@ Seg.data: Dict[str, Any] = {
} }
``` ```
其中message_id是消息的实际qq_id于新版的mmc中可以从数据库获取如果工作正常的话 其中message_id是消息的实际qq_id于新版的mmc中可以从数据库获取如果工作正常的话
### 给消息贴表情
```python
Seg.data: Dict[str, Any] = {
"name": "MESSAGE_LIKE",
"args": {
"message_id": "消息ID",
"emoji_id": "表情ID"
}
}
```
### 设置群名
设置指定群的群名称。
```python
Seg.data: Dict[str, Any] = {
"name": "SET_GROUP_NAME",
"args": {
"group_id": 123456789, # 可选,如果在群聊上下文中可从 group_info 自动获取
"group_name": "新群名" # 必需,新的群名称
}
}
```
### 设置账号信息
设置Bot自己的QQ账号资料。
```python
Seg.data: Dict[str, Any] = {
"name": "SET_QQ_PROFILE",
"args": {
"nickname": "新昵称", # 必需,昵称
"personal_note": "个性签名", # 可选,个性签名
"sex": "male" # 可选,性别:"male" | "female" | "unknown"
}
}
```
**返回数据示例:**
```python
{
"result": 0, # 结果码0为成功
"errMsg": "" # 错误信息
}
```
---
## 查询类命令
### 获取登录号信息
获取Bot自身的账号信息。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_LOGIN_INFO",
"args": {}
}
```
**返回数据示例:**
```python
{
"user_id": 12345678,
"nickname": "Bot昵称"
}
```
### 获取陌生人信息
```python
Seg.data: Dict[str, Any] = {
"name": "GET_STRANGER_INFO",
"args": {
"user_id": "用户QQ号"
}
}
```
**返回数据示例:**
```python
{
"user_id": 12345678,
"nickname": "用户昵称",
"sex": "male/female/unknown",
"age": 0
}
```
### 获取好友列表
获取Bot的好友列表。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_FRIEND_LIST",
"args": {
"no_cache": False # 可选,是否不使用缓存,默认 False
}
}
```
**返回数据示例:**
```python
[
{
"user_id": 12345678,
"nickname": "好友昵称",
"remark": "备注名",
"sex": "male", # "male" | "female" | "unknown"
"age": 18,
"qid": "QID字符串",
"level": 64,
"login_days": 365,
"birthday_year": 2000,
"birthday_month": 1,
"birthday_day": 1,
"phone_num": "电话号码",
"email": "邮箱",
"category_id": 0, # 分组ID
"categoryName": "我的好友", # 分组名称
"categoryId": 0
},
...
]
```
### 获取群信息
获取指定群的详细信息。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_INFO",
"args": {
"group_id": 123456789 # 可选,如果在群聊上下文中可从 group_info 自动获取
}
}
```
**返回数据示例:**
```python
{
"group_id": "123456789", # 群号(字符串)
"group_name": "群名称",
"group_remark": "群备注",
"group_all_shut": 0, # 群全员禁言状态0=未禁言)
"member_count": 100, # 当前成员数量
"max_member_count": 500 # 最大成员数量
}
```
### 获取群详细信息
获取指定群的详细信息(与 GET_GROUP_INFO 类似,可能提供更实时的数据)。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_DETAIL_INFO",
"args": {
"group_id": 123456789 # 可选,如果在群聊上下文中可从 group_info 自动获取
}
}
```
**返回数据示例:**
```python
{
"group_id": 123456789, # 群号(数字)
"group_name": "群名称",
"group_remark": "群备注",
"group_all_shut": 0, # 群全员禁言状态0=未禁言)
"member_count": 100, # 当前成员数量
"max_member_count": 500 # 最大成员数量
}
```
### 获取群列表
获取Bot加入的所有群列表。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_LIST",
"args": {
"no_cache": False # 可选,是否不使用缓存,默认 False
}
}
```
**返回数据示例:**
```python
[
{
"group_id": "123456789", # 群号(字符串)
"group_name": "群名称",
"group_remark": "群备注",
"group_all_shut": 0, # 群全员禁言状态
"member_count": 100, # 当前成员数量
"max_member_count": 500 # 最大成员数量
},
...
]
```
### 获取群@全体成员剩余次数
查询指定群的@全体成员剩余使用次数。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_AT_ALL_REMAIN",
"args": {
"group_id": 123456789 # 可选,如果在群聊上下文中可从 group_info 自动获取
}
}
```
**返回数据示例:**
```python
{
"can_at_all": True, # 是否可以@全体成员
"remain_at_all_count_for_group": 10, # 群剩余@全体成员次数
"remain_at_all_count_for_uin": 5 # Bot剩余@全体成员次数
}
```
### 获取群成员信息
获取指定群成员的详细信息。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_MEMBER_INFO",
"args": {
"group_id": 123456789, # 可选,如果在群聊上下文中可从 group_info 自动获取
"user_id": 12345678, # 必需用户QQ号
"no_cache": False # 可选,是否不使用缓存,默认 False
}
}
```
**返回数据示例:**
```python
{
"group_id": 123456789,
"user_id": 12345678,
"nickname": "昵称",
"card": "群名片",
"sex": "male", # "male" | "female" | "unknown"
"age": 18,
"join_time": 1234567890, # 加群时间戳
"last_sent_time": 1234567890, # 最后发言时间戳
"level": 1, # 群等级
"qq_level": 64, # QQ等级
"role": "member", # "owner" | "admin" | "member"
"title": "专属头衔",
"area": "地区",
"unfriendly": False, # 是否不友好
"title_expire_time": 1234567890, # 头衔过期时间
"card_changeable": True, # 名片是否可修改
"shut_up_timestamp": 0, # 禁言时间戳
"is_robot": False, # 是否机器人
"qage": "10年" # Q龄
}
```
### 获取群成员列表
获取指定群的所有成员列表。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_GROUP_MEMBER_LIST",
"args": {
"group_id": 123456789, # 可选,如果在群聊上下文中可从 group_info 自动获取
"no_cache": False # 可选,是否不使用缓存,默认 False
}
}
```
**返回数据示例:**
```python
[
{
"group_id": 123456789,
"user_id": 12345678,
"nickname": "昵称",
"card": "群名片",
"sex": "male", # "male" | "female" | "unknown"
"age": 18,
"join_time": 1234567890,
"last_sent_time": 1234567890,
"level": 1,
"qq_level": 64,
"role": "member", # "owner" | "admin" | "member"
"title": "专属头衔",
"area": "地区",
"unfriendly": False,
"title_expire_time": 1234567890,
"card_changeable": True,
"shut_up_timestamp": 0,
"is_robot": False,
"qage": "10年"
},
...
]
```
### 获取消息详情
获取指定消息的完整详情信息。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_MSG",
"args": {
"message_id": 123456 # 必需消息ID
}
}
```
**返回数据示例:**
```python
{
"self_id": 12345678, # Bot自身ID
"user_id": 87654321, # 发送者ID
"time": 1234567890, # 时间戳
"message_id": 123456, # 消息ID
"message_seq": 123456, # 消息序列号
"real_id": 123456, # 真实消息ID
"real_seq": "123456", # 真实序列号(字符串)
"message_type": "group", # "private" | "group"
"sub_type": "normal", # 子类型
"message_format": "array", # 消息格式
"post_type": "message", # 事件类型
"group_id": 123456789, # 群号(群消息时存在)
"sender": {
"user_id": 87654321,
"nickname": "昵称",
"sex": "male", # "male" | "female" | "unknown"
"age": 18,
"card": "群名片", # 群消息时存在
"level": "1", # 群等级(字符串)
"role": "member" # "owner" | "admin" | "member"
},
"message": [...], # 消息段数组
"raw_message": "消息文本内容", # 原始消息文本
"font": 0 # 字体
}
```
### 获取合并转发消息
获取合并转发消息的所有子消息内容。
```python
Seg.data: Dict[str, Any] = {
"name": "GET_FORWARD_MSG",
"args": {
"message_id": "7123456789012345678" # 必需合并转发消息ID字符串
}
}
```
**返回数据示例:**
```python
{
"messages": [
{
"sender": {
"user_id": 87654321,
"nickname": "昵称",
"sex": "male",
"age": 18,
"card": "群名片",
"level": "1",
"role": "member"
},
"time": 1234567890,
"message": [...] # 消息段数组
},
...
]
}
```

215
main.py
View File

@ -10,13 +10,15 @@ from src.recv_handler.notice_handler import notice_handler
from src.recv_handler.message_sending import message_send_instance from src.recv_handler.message_sending import message_send_instance
from src.send_handler.nc_sending import nc_message_sender from src.send_handler.nc_sending import nc_message_sender
from src.config import global_config from src.config import global_config
from src.mmc_com_layer import mmc_start_com, mmc_stop_com from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
from src.response_pool import put_response, check_timeout_response from src.response_pool import put_response, check_timeout_response
message_queue = asyncio.Queue() message_queue = asyncio.Queue()
websocket_server = None # 保存WebSocket服务器实例以便关闭
async def message_recv(server_connection: Server.ServerConnection): async def message_recv(server_connection: Server.ServerConnection):
try:
await message_handler.set_server_connection(server_connection) await message_handler.set_server_connection(server_connection)
asyncio.create_task(notice_handler.set_server_connection(server_connection)) asyncio.create_task(notice_handler.set_server_connection(server_connection))
await nc_message_sender.set_server_connection(server_connection) await nc_message_sender.set_server_connection(server_connection)
@ -28,6 +30,10 @@ async def message_recv(server_connection: Server.ServerConnection):
await message_queue.put(decoded_raw_message) await message_queue.put(decoded_raw_message)
elif post_type is None: elif post_type is None:
await put_response(decoded_raw_message) await put_response(decoded_raw_message)
except asyncio.CancelledError:
logger.debug("message_recv 收到取消信号,正在关闭连接")
await server_connection.close()
raise
async def message_process(): async def message_process():
@ -47,7 +53,71 @@ async def message_process():
async def main(): async def main():
_ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process(), check_timeout_response()) # 启动配置文件监控并注册napcat_server配置变更回调
from src.config import config_manager
# 保存napcat_server任务的引用用于重启
napcat_task = None
restart_event = asyncio.Event()
async def on_napcat_config_change(old_value, new_value):
"""当napcat_server配置变更时重启WebSocket服务器"""
nonlocal napcat_task
logger.warning(
f"NapCat配置已变更:\n"
f" 旧配置: {old_value.host}:{old_value.port}\n"
f" 新配置: {new_value.host}:{new_value.port}"
)
# 关闭当前WebSocket服务器
global websocket_server
if websocket_server:
try:
logger.info("正在关闭旧的WebSocket服务器...")
websocket_server.close()
await websocket_server.wait_closed()
logger.info("旧的WebSocket服务器已关闭")
except Exception as e:
logger.error(f"关闭旧WebSocket服务器失败: {e}")
# 取消旧任务
if napcat_task and not napcat_task.done():
napcat_task.cancel()
try:
await napcat_task
except asyncio.CancelledError:
pass
# 触发重启
restart_event.set()
config_manager.on_config_change("napcat_server", on_napcat_config_change)
# 启动文件监控
asyncio.create_task(config_manager.start_watch())
# WebSocket服务器重启循环
async def napcat_with_restart():
nonlocal napcat_task
while True:
restart_event.clear()
try:
await napcat_server()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"NapCat服务器异常: {e}")
break
# 等待重启信号
if not restart_event.is_set():
break
logger.info("正在重启WebSocket服务器...")
await asyncio.sleep(1) # 等待1秒后重启
_ = await asyncio.gather(napcat_with_restart(), mmc_start_com(), message_process(), check_timeout_response())
def check_napcat_server_token(conn, request): def check_napcat_server_token(conn, request):
token = global_config.napcat_server.token token = global_config.napcat_server.token
@ -63,26 +133,90 @@ def check_napcat_server_token(conn, request):
return None return None
async def napcat_server(): async def napcat_server():
logger.info("正在启动adapter...") global websocket_server
async with Server.serve(message_recv, global_config.napcat_server.host, global_config.napcat_server.port, max_size=2**26, process_request=check_napcat_server_token) as server: logger.info("正在启动 MaiBot-Napcat-Adapter...")
logger.info( logger.debug(f"日志等级: {global_config.debug.level}")
f"Adapter已启动监听地址: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}" logger.debug("日志文件: logs/adapter_*.log")
)
await server.serve_forever()
async def graceful_shutdown():
try: try:
async with Server.serve(
message_recv,
global_config.napcat_server.host,
global_config.napcat_server.port,
max_size=2**26,
process_request=check_napcat_server_token
) as server:
websocket_server = server
logger.success(
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
)
try:
await server.serve_forever()
except asyncio.CancelledError:
logger.debug("napcat_server 收到取消信号")
raise
except OSError:
# 端口绑定失败时抛出异常让外层处理
raise
async def graceful_shutdown(silent: bool = False):
"""
优雅关闭adapter
Args:
silent: 静默模式,控制台不输出日志,但仍记录到文件
"""
global websocket_server
try:
if not silent:
logger.info("正在关闭adapter...") logger.info("正在关闭adapter...")
else:
logger.debug("正在清理资源...")
# 先关闭WebSocket服务器
if websocket_server:
try:
logger.debug("正在关闭WebSocket服务器")
websocket_server.close()
await websocket_server.wait_closed()
logger.debug("WebSocket服务器已关闭")
except Exception as e:
logger.debug(f"关闭WebSocket服务器时出现错误: {e}")
# 关闭MMC连接
try:
await asyncio.wait_for(mmc_stop_com(), timeout=3)
except asyncio.TimeoutError:
logger.debug("关闭MMC连接超时")
except Exception as e:
logger.debug(f"关闭MMC连接时出现错误: {e}")
# 取消所有任务
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if tasks:
logger.debug(f"正在取消 {len(tasks)} 个任务")
for task in tasks: for task in tasks:
if not task.done(): if not task.done():
task.cancel() task.cancel()
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), 15)
await mmc_stop_com() # 后置避免神秘exception # 等待任务完成,记录异常到日志文件
logger.info("Adapter已成功关闭") if tasks:
try:
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
# 记录任务取消的详细信息到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.debug(f"任务 {i+1} 清理时产生异常: {type(result).__name__}: {result}")
except asyncio.TimeoutError:
logger.debug("任务清理超时")
except Exception as e: except Exception as e:
logger.error(f"Adapter关闭中出现错误: {e}") logger.debug(f"任务清理时出现错误: {e}")
if not silent:
logger.info("Adapter已成功关闭")
else:
logger.debug("资源清理完成")
except Exception as e:
logger.debug(f"graceful_shutdown异常: {e}", exc_info=True)
if __name__ == "__main__": if __name__ == "__main__":
@ -92,11 +226,58 @@ if __name__ == "__main__":
loop.run_until_complete(main()) loop.run_until_complete(main())
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...") logger.warning("收到中断信号,正在优雅关闭...")
loop.run_until_complete(graceful_shutdown()) try:
loop.run_until_complete(graceful_shutdown(silent=False))
except Exception:
pass
except OSError as e:
# 处理端口占用等网络错误
if e.errno == 10048 or "address already in use" in str(e).lower():
logger.error(f"❌ 端口 {global_config.napcat_server.port} 已被占用,请检查:")
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
logger.error(" 2. 修改 config.toml 中的 port 配置")
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
else:
logger.error(f"❌ 网络错误: {str(e)}")
logger.debug("完整错误信息:", exc_info=True)
# 端口占用时静默清理(控制台不输出,但记录到日志文件)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e: except Exception as e:
logger.exception(f"主程序异常: {str(e)}") logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1)
except Exception as e:
logger.error(f"❌ 主程序异常: {str(e)}")
logger.debug("详细错误信息:", exc_info=True)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e:
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1) sys.exit(1)
finally:
# 清理事件循环
try:
# 取消所有剩余任务
pending = asyncio.all_tasks(loop)
if pending:
logger.debug(f"finally块清理 {len(pending)} 个剩余任务")
for task in pending:
task.cancel()
# 给任务一点时间完成取消
try:
results = loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
# 记录清理结果到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.debug(f"剩余任务 {i+1} 清理异常: {type(result).__name__}: {result}")
except Exception as e:
logger.debug(f"清理剩余任务时出现错误: {e}")
except Exception as e:
logger.debug(f"finally块清理出现错误: {e}")
finally: finally:
if loop and not loop.is_closed(): if loop and not loop.is_closed():
logger.debug("关闭事件循环")
loop.close() loop.close()
sys.exit(0) sys.exit(0)

View File

@ -2,6 +2,19 @@
name = "MaiBotNapcatAdapter" name = "MaiBotNapcatAdapter"
version = "0.5.5" version = "0.5.5"
description = "A MaiBot adapter for Napcat" description = "A MaiBot adapter for Napcat"
dependencies = [
"aiohttp>=3.13.2",
"asyncio>=4.0.0",
"loguru>=0.7.3",
"maim-message>=0.5.7",
"pillow>=12.0.0",
"requests>=2.32.5",
"rich>=14.2.0",
"sqlmodel>=0.0.27",
"tomlkit>=0.13.3",
"websockets>=15.0.1",
"watchdog>=3.0.0",
]
[tool.ruff] [tool.ruff]

View File

@ -8,3 +8,4 @@ pillow
tomlkit tomlkit
rich rich
sqlmodel sqlmodel
watchdog

View File

@ -7,13 +7,30 @@ from .logger import logger
class CommandType(Enum): class CommandType(Enum):
"""命令类型""" """命令类型"""
# 操作类命令
GROUP_BAN = "set_group_ban" # 禁言用户 GROUP_BAN = "set_group_ban" # 禁言用户
GROUP_WHOLE_BAN = "set_group_whole_ban" # 群全体禁言 GROUP_WHOLE_BAN = "set_group_whole_ban" # 群全体禁言
GROUP_KICK = "set_group_kick" # 踢出群聊 GROUP_KICK = "set_group_kick" # 踢出群聊
GROUP_KICK_MEMBERS = "set_group_kick_members" # 批量踢出群成员
SET_GROUP_NAME = "set_group_name" # 设置群名
SEND_POKE = "send_poke" # 戳一戳 SEND_POKE = "send_poke" # 戳一戳
DELETE_MSG = "delete_msg" # 撤回消息 DELETE_MSG = "delete_msg" # 撤回消息
AI_VOICE_SEND = "send_group_ai_record" # 发送群AI语音 AI_VOICE_SEND = "send_group_ai_record" # 发送群AI语音
MESSAGE_LIKE = "message_like" # 给消息贴表情 MESSAGE_LIKE = "message_like" # 给消息贴表情
SET_QQ_PROFILE = "set_qq_profile" # 设置账号信息
# 查询类命令
GET_LOGIN_INFO = "get_login_info" # 获取登录号信息
GET_STRANGER_INFO = "get_stranger_info" # 获取陌生人信息
GET_FRIEND_LIST = "get_friend_list" # 获取好友列表
GET_GROUP_INFO = "get_group_info" # 获取群信息
GET_GROUP_DETAIL_INFO = "get_group_detail_info" # 获取群详细信息
GET_GROUP_LIST = "get_group_list" # 获取群列表
GET_GROUP_AT_ALL_REMAIN = "get_group_at_all_remain" # 获取群@全体成员剩余次数
GET_GROUP_MEMBER_INFO = "get_group_member_info" # 获取群成员信息
GET_GROUP_MEMBER_LIST = "get_group_member_list" # 获取群成员列表
GET_MSG = "get_msg" # 获取消息
GET_FORWARD_MSG = "get_forward_msg" # 获取合并转发消息
def __str__(self) -> str: def __str__(self) -> str:
return self.value return self.value

View File

@ -1,5 +1,6 @@
from .config import global_config from .config import global_config, _config_manager as config_manager
__all__ = [ __all__ = [
"global_config", "global_config",
"config_manager",
] ]

View File

@ -14,6 +14,7 @@ from src.config.config_base import ConfigBase
from src.config.official_configs import ( from src.config.official_configs import (
ChatConfig, ChatConfig,
DebugConfig, DebugConfig,
ForwardConfig,
MaiBotServerConfig, MaiBotServerConfig,
NapcatServerConfig, NapcatServerConfig,
NicknameConfig, NicknameConfig,
@ -117,6 +118,7 @@ class Config(ConfigBase):
maibot_server: MaiBotServerConfig maibot_server: MaiBotServerConfig
chat: ChatConfig chat: ChatConfig
voice: VoiceConfig voice: VoiceConfig
forward: ForwardConfig
debug: DebugConfig debug: DebugConfig
@ -142,5 +144,15 @@ def load_config(config_path: str) -> Config:
update_config() update_config()
logger.info("正在品鉴配置文件...") logger.info("正在品鉴配置文件...")
global_config = load_config(config_path="config.toml")
# 创建配置管理器
from .config_manager import ConfigManager
_config_manager = ConfigManager()
_config_manager.load(config_path="config.toml")
# 向后兼容global_config 指向配置管理器
# 所有现有代码可以继续使用 global_config.chat.xxx 访问配置
global_config = _config_manager
logger.info("非常的新鲜,非常的美味!") logger.info("非常的新鲜,非常的美味!")

View File

@ -0,0 +1,281 @@
"""配置管理器 - 支持热重载"""
import asyncio
import os
from typing import Callable, Dict, List, Any, Optional
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from ..logger import logger
from .config import Config, load_config
class ConfigManager:
"""配置管理器 - 混合模式(属性代理 + 选择性回调)
支持热重载配置文件使用watchdog实时监控文件变化
需要特殊处理的配置项可以注册回调函数
"""
def __init__(self) -> None:
self._config: Optional[Config] = None
self._config_path: str = "config.toml"
self._lock: asyncio.Lock = asyncio.Lock()
self._callbacks: Dict[str, List[Callable]] = {}
# Watchdog相关
self._observer: Optional[Observer] = None
self._event_handler: Optional[FileSystemEventHandler] = None
self._reload_debounce_task: Optional[asyncio.Task] = None
self._debounce_delay: float = 0.5 # 防抖延迟(秒)
self._loop: Optional[asyncio.AbstractEventLoop] = None # 事件循环引用
self._is_reloading: bool = False # 标记是否正在重载
self._last_reload_trigger: float = 0.0 # 最后一次触发重载的时间
def load(self, config_path: str = "config.toml") -> None:
"""加载配置文件
Args:
config_path: 配置文件路径
"""
self._config_path = os.path.abspath(config_path)
self._config = load_config(config_path)
logger.info(f"配置已加载: {config_path}")
async def reload(self, config_path: Optional[str] = None) -> bool:
"""重载配置文件(热重载)
Args:
config_path: 配置文件路径如果为None则使用初始路径
Returns:
bool: 是否重载成功
"""
if config_path is None:
config_path = self._config_path
async with self._lock:
old_config = self._config
try:
new_config = load_config(config_path)
if old_config is not None:
await self._notify_changes(old_config, new_config)
self._config = new_config
logger.info(f"配置重载成功: {config_path}")
return True
except Exception as e:
logger.error(f"配置重载失败: {e}", exc_info=True)
return False
def on_config_change(
self,
config_path: str,
callback: Callable[[Any, Any], Any]
) -> None:
"""为特定配置路径注册回调函数
Args:
config_path: 配置路径 'napcat_server', 'chat.ban_user_id', 'debug.level'
callback: 回调函数签名为 async def callback(old_value, new_value)
"""
if config_path not in self._callbacks:
self._callbacks[config_path] = []
self._callbacks[config_path].append(callback)
logger.debug(f"已注册配置变更回调: {config_path}")
async def _notify_changes(self, old_config: Config, new_config: Config) -> None:
"""通知配置变更
Args:
old_config: 旧配置对象
new_config: 新配置对象
"""
for config_path, callbacks in self._callbacks.items():
try:
old_value = self._get_value(old_config, config_path)
new_value = self._get_value(new_config, config_path)
if old_value != new_value:
logger.info(f"检测到配置变更: {config_path}")
for callback in callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(old_value, new_value)
else:
callback(old_value, new_value)
except Exception as e:
logger.error(
f"配置变更回调执行失败 [{config_path}]: {e}",
exc_info=True
)
except Exception as e:
logger.error(f"获取配置值失败 [{config_path}]: {e}")
def _get_value(self, config: Config, path: str) -> Any:
"""获取嵌套配置值
Args:
config: 配置对象
path: 配置路径支持点分隔的嵌套路径
Returns:
Any: 配置值
Raises:
AttributeError: 配置路径不存在
"""
parts = path.split('.')
value = config
for part in parts:
value = getattr(value, part)
return value
def __getattr__(self, name: str) -> Any:
"""动态代理配置属性访问
支持直接访问配置对象的属性
- config_manager.napcat_server
- config_manager.chat
- config_manager.debug
Args:
name: 属性名
Returns:
Any: 配置对象的对应属性值
Raises:
RuntimeError: 配置尚未加载
AttributeError: 属性不存在
"""
# 私有属性不代理
if name.startswith('_'):
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'"
)
# 检查配置是否已加载
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
# 尝试从 _config 获取属性
try:
return getattr(self._config, name)
except AttributeError as e:
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'"
) from e
async def start_watch(self) -> None:
"""启动配置文件监控(需要在事件循环中调用)"""
if self._observer is not None:
logger.warning("配置文件监控已在运行")
return
# 保存当前事件循环引用
self._loop = asyncio.get_running_loop()
# 创建文件监控事件处理器
config_file_path = self._config_path
class ConfigFileHandler(FileSystemEventHandler):
def __init__(handler_self, manager: "ConfigManager"):
handler_self.manager = manager
handler_self.config_path = config_file_path
def on_modified(handler_self, event):
# 检查是否是目标配置文件修改事件
if isinstance(event, FileModifiedEvent) and os.path.abspath(event.src_path) == handler_self.config_path:
logger.debug(f"检测到配置文件变更: {event.src_path}")
# 使用防抖机制避免重复重载
# watchdog运行在独立线程需要使用run_coroutine_threadsafe
if handler_self.manager._loop:
asyncio.run_coroutine_threadsafe(
handler_self.manager._debounced_reload(),
handler_self.manager._loop
)
self._event_handler = ConfigFileHandler(self)
# 创建Observer并监控配置文件所在目录
self._observer = Observer()
watch_dir = os.path.dirname(self._config_path) or "."
self._observer.schedule(self._event_handler, watch_dir, recursive=False)
self._observer.start()
logger.info(f"已启动配置文件实时监控: {self._config_path}")
async def stop_watch(self) -> None:
"""停止配置文件监控"""
if self._observer is None:
return
logger.debug("正在停止配置文件监控")
# 取消防抖任务
if self._reload_debounce_task:
self._reload_debounce_task.cancel()
try:
await self._reload_debounce_task
except asyncio.CancelledError:
pass
# 停止observer
self._observer.stop()
self._observer.join(timeout=2)
self._observer = None
self._event_handler = None
logger.info("配置文件监控已停止")
async def _debounced_reload(self) -> None:
"""防抖重载:避免短时间内多次文件修改事件导致重复重载"""
import time
# 记录当前触发时间
trigger_time = time.time()
self._last_reload_trigger = trigger_time
# 等待防抖延迟
await asyncio.sleep(self._debounce_delay)
# 检查是否有更新的触发
if self._last_reload_trigger > trigger_time:
# 有更新的触发,放弃本次重载
logger.debug("放弃过时的重载请求")
return
# 检查是否已有重载在进行
if self._is_reloading:
logger.debug("重载已在进行中,跳过")
return
# 执行重载
self._is_reloading = True
try:
modified_time = datetime.fromtimestamp(
os.path.getmtime(self._config_path)
).strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"配置文件已更新 (修改时间: {modified_time}),正在重载..."
)
success = await self.reload()
if not success:
logger.error(
"配置文件重载失败!请检查配置文件格式是否正确。\n"
"当前仍使用旧配置运行,修复配置文件后将自动重试。"
)
finally:
self._is_reloading = False
def __repr__(self) -> str:
watching = self._observer is not None and self._observer.is_alive()
return f"<ConfigManager config_path={self._config_path} watching={watching}>"

View File

@ -86,6 +86,14 @@ class VoiceConfig(ConfigBase):
"""是否启用TTS功能""" """是否启用TTS功能"""
@dataclass
class ForwardConfig(ConfigBase):
"""转发消息相关配置"""
image_threshold: int = 3
"""图片数量阈值转发消息中图片数量超过此值时使用占位符代替base64发送避免麦麦VLM处理卡死"""
@dataclass @dataclass
class DebugConfig(ConfigBase): class DebugConfig(ConfigBase):
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"

View File

@ -1,21 +1,106 @@
from loguru import logger from loguru import logger
from .config import global_config from .config import global_config
import sys import sys
from pathlib import Path
from datetime import datetime, timedelta
# 默认 logger # 日志目录配置
LOG_DIR = Path(__file__).parent.parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
# 日志等级映射(用于显示单字母)
LEVEL_ABBR = {
"TRACE": "T",
"DEBUG": "D",
"INFO": "I",
"SUCCESS": "S",
"WARNING": "W",
"ERROR": "E",
"CRITICAL": "C"
}
def get_level_abbr(record):
"""获取日志等级的缩写"""
return LEVEL_ABBR.get(record["level"].name, record["level"].name[0])
def clean_old_logs(days: int = 30):
"""清理超过指定天数的日志文件"""
try:
cutoff_date = datetime.now() - timedelta(days=days)
for log_file in LOG_DIR.glob("*.log"):
try:
file_time = datetime.fromtimestamp(log_file.stat().st_mtime)
if file_time < cutoff_date:
log_file.unlink()
print(f"已清理过期日志: {log_file.name}")
except Exception as e:
print(f"清理日志文件 {log_file.name} 失败: {e}")
except Exception as e:
print(f"清理日志目录失败: {e}")
# 清理过期日志
clean_old_logs(30)
# 移除默认处理器
logger.remove() logger.remove()
# 自定义格式化函数
def format_log(record):
"""格式化日志记录"""
record["extra"]["level_abbr"] = get_level_abbr(record)
if "module_name" not in record["extra"]:
record["extra"]["module_name"] = "Adapter"
return True
# 控制台输出处理器 - 简洁格式
logger.add( logger.add(
sys.stderr, sys.stderr,
level=global_config.debug.level, level=global_config.debug.level,
format="<blue>{time:YYYY-MM-DD HH:mm:ss}</blue> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<blue>{time:MM-DD HH:mm:ss}</blue> | <level>[{extra[level_abbr]}]</level> | <cyan>{extra[module_name]}</cyan> | <level>{message}</level>",
filter=lambda record: "name" not in record["extra"] or record["extra"].get("name") != "maim_message", filter=lambda record: format_log(record) and record["extra"].get("module_name") != "maim_message",
) )
# maim_message 单独处理
logger.add( logger.add(
sys.stderr, sys.stderr,
level="INFO", level="INFO",
format="<red>{time:YYYY-MM-DD HH:mm:ss}</red> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<red>{time:MM-DD HH:mm:ss}</red> | <level>[{extra[level_abbr]}]</level> | <cyan>{extra[module_name]}</cyan> | <level>{message}</level>",
filter=lambda record: record["extra"].get("name") == "maim_message", filter=lambda record: format_log(record) and record["extra"].get("module_name") == "maim_message",
) )
# 创建样式不同的 logger
custom_logger = logger.bind(name="maim_message") # 文件输出处理器 - 详细格式,记录所有TRACE级别
logger = logger.bind(name="MaiBot-Napcat-Adapter") log_file = LOG_DIR / f"adapter_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logger.add(
log_file,
level="TRACE",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | [{level}] | {extra[module_name]} | {name}:{function}:{line} - {message}",
rotation="100 MB", # 单个日志文件最大100MB
retention="30 days", # 保留30天
encoding="utf-8",
enqueue=True, # 异步写入,避免阻塞
filter=format_log, # 确保extra字段存在
)
def get_logger(module_name: str = "Adapter"):
"""
获取自定义模块名的logger
Args:
module_name: 模块名称,用于日志输出中标识来源
Returns:
配置好的logger实例
Example:
>>> from src.logger import get_logger
>>> logger = get_logger("MyModule")
>>> logger.info("这是一条日志")
MM-DD HH:mm:ss | [I] | MyModule | 这是一条日志
"""
return logger.bind(module_name=module_name)
# 默认logger实例(用于向后兼容)
logger = logger.bind(module_name="Adapter")
# maim_message的logger
custom_logger = logger.bind(module_name="maim_message")

View File

@ -32,14 +32,38 @@ class NoticeType: # 通知事件
group_recall = "group_recall" # 群聊消息撤回 group_recall = "group_recall" # 群聊消息撤回
notify = "notify" notify = "notify"
group_ban = "group_ban" # 群禁言 group_ban = "group_ban" # 群禁言
group_msg_emoji_like = "group_msg_emoji_like" # 群消息表情回应
group_upload = "group_upload" # 群文件上传
group_increase = "group_increase" # 群成员增加
group_decrease = "group_decrease" # 群成员减少
group_admin = "group_admin" # 群管理员变动
essence = "essence" # 精华消息
class Notify: class Notify:
poke = "poke" # 戳一戳 poke = "poke" # 戳一戳
group_name = "group_name" # 群名称变更
class GroupBan: class GroupBan:
ban = "ban" # 禁言 ban = "ban" # 禁言
lift_ban = "lift_ban" # 解除禁言 lift_ban = "lift_ban" # 解除禁言
class GroupIncrease:
approve = "approve" # 管理员同意入群
invite = "invite" # 被邀请入群
class GroupDecrease:
leave = "leave" # 主动退群
kick = "kick" # 被踢出群
kick_me = "kick_me" # 机器人被踢
class GroupAdmin:
set = "set" # 设置管理员
unset = "unset" # 取消管理员
class Essence:
add = "add" # 添加精华消息
delete = "delete" # 移除精华消息
class RealMessageType: # 实际消息分类 class RealMessageType: # 实际消息分类
text = "text" # 纯文本 text = "text" # 纯文本
@ -56,6 +80,8 @@ class RealMessageType: # 实际消息分类
reply = "reply" # 回复消息 reply = "reply" # 回复消息
forward = "forward" # 转发消息 forward = "forward" # 转发消息
node = "node" # 转发消息节点 node = "node" # 转发消息节点
json = "json" # JSON卡片消息
file = "file" # 文件消息
class MessageSentType: class MessageSentType:

View File

@ -8,6 +8,7 @@ from src.utils import (
get_self_info, get_self_info,
get_message_detail, get_message_detail,
) )
import base64
from .qq_emoji_list import qq_face from .qq_emoji_list import qq_face
from .message_sending import message_send_instance from .message_sending import message_send_instance
from . import RealMessageType, MessageType, ACCEPT_FORMAT from . import RealMessageType, MessageType, ACCEPT_FORMAT
@ -300,7 +301,23 @@ class MessageHandler:
else: else:
logger.warning("record处理失败或不支持") logger.warning("record处理失败或不支持")
case RealMessageType.video: case RealMessageType.video:
logger.warning("不支持视频解析") ret_seg = await self.handle_video_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("video处理失败")
case RealMessageType.json:
ret_segs = await self.handle_json_message(sub_message)
if ret_segs:
seg_message.extend(ret_segs)
else:
logger.warning("json处理失败")
case RealMessageType.file:
ret_seg = await self.handle_file_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("file处理失败")
case RealMessageType.at: case RealMessageType.at:
ret_seg = await self.handle_at_message( ret_seg = await self.handle_at_message(
sub_message, sub_message,
@ -445,6 +462,308 @@ class MessageHandler:
return None return None
return Seg(type="voice", data=audio_base64) return Seg(type="voice", data=audio_base64)
async def handle_video_message(self, raw_message: dict) -> Seg | None:
"""
处理视频消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段video_card类型
"""
message_data: dict = raw_message.get("data")
file: str = message_data.get("file", "")
url: str = message_data.get("url", "")
file_size: str = message_data.get("file_size", "")
if not file:
logger.warning("视频消息缺少文件信息")
return None
# 返回结构化的视频卡片数据
return Seg(type="video_card", data={
"file": file,
"file_size": file_size,
"url": url
})
async def handle_json_message(self, raw_message: dict) -> List[Seg] | None:
"""
处理JSON卡片消息(小程序分享群公告等)
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: List[Seg]: 处理后的消息段列表可能包含文本和图片
"""
message_data: dict = raw_message.get("data")
json_data: str = message_data.get("data")
if not json_data:
logger.warning("JSON消息缺少数据")
return None
try:
# 尝试解析JSON获取详细信息
parsed_json = json.loads(json_data)
app = parsed_json.get("app", "")
meta = parsed_json.get("meta", {})
# 群公告由于图片URL是加密的因此无法读取
if app == "com.tencent.mannounce":
mannounce = meta.get("mannounce", {})
title = mannounce.get("title", "")
text = mannounce.get("text", "")
encode_flag = mannounce.get("encode", 0)
if encode_flag == 1:
try:
if title:
title = base64.b64decode(title).decode("utf-8", errors="ignore")
if text:
text = base64.b64decode(text).decode("utf-8", errors="ignore")
except Exception as e:
logger.warning(f"群公告Base64解码失败: {e}")
if title and text:
content = f"[{title}]:{text}"
elif title:
content = f"[{title}]"
elif text:
content = f"{text}"
else:
content = "[群公告]"
return [Seg(type="text", data=content)]
# 音乐卡片
if app in ("com.tencent.music.lua", "com.tencent.structmsg"):
music = meta.get("music", {})
if music:
title = music.get("title", "")
singer = music.get("desc", "") or music.get("singer", "")
jump_url = music.get("jumpUrl", "") or music.get("jump_url", "")
music_url = music.get("musicUrl", "") or music.get("music_url", "")
tag = music.get("tag", "")
preview = music.get("preview", "")
return [Seg(type="music_card", data={
"title": title,
"singer": singer,
"jump_url": jump_url,
"music_url": music_url,
"tag": tag,
"preview": preview
})]
# QQ小程序分享含预览图
if app == "com.tencent.miniapp_01":
detail = meta.get("detail_1", {})
if detail:
title = detail.get("title", "")
desc = detail.get("desc", "")
url = detail.get("url", "")
qqdocurl = detail.get("qqdocurl", "")
preview_url = detail.get("preview", "")
icon = detail.get("icon", "")
seg_list = [Seg(type="miniapp_card", data={
"title": title,
"desc": desc,
"url": url,
"source_url": qqdocurl,
"preview": preview_url,
"icon": icon
})]
# 下载预览图
if preview_url:
try:
image_base64 = await get_image_base64(preview_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"QQ小程序预览图下载失败: {e}")
return seg_list
# 礼物消息
if app == "com.tencent.giftmall.giftark":
giftark = meta.get("giftark", {})
if giftark:
gift_name = giftark.get("title", "礼物")
desc = giftark.get("desc", "")
gift_text = f"[赠送礼物: {gift_name}]"
if desc:
gift_text += f"\n{desc}"
return [Seg(type="text", data=gift_text)]
# 推荐联系人
if app == "com.tencent.contact.lua":
contact_info = meta.get("contact", {})
name = contact_info.get("nickname", "未知联系人")
tag = contact_info.get("tag", "推荐联系人")
return [Seg(type="text", data=f"[{tag}] {name}")]
# 推荐群聊
if app == "com.tencent.troopsharecard":
contact_info = meta.get("contact", {})
name = contact_info.get("nickname", "未知群聊")
tag = contact_info.get("tag", "推荐群聊")
return [Seg(type="text", data=f"[{tag}] {name}")]
# 图文分享(如 哔哩哔哩HD、网页、群精华等
if app == "com.tencent.tuwen.lua":
news = meta.get("news", {})
title = news.get("title", "未知标题")
desc = (news.get("desc", "") or "").replace("[图片]", "").strip()
tag = news.get("tag", "图文分享")
preview_url = news.get("preview", "")
if tag and title and tag in title:
title = title.replace(tag, "", 1).strip(": -— ")
text_content = f"[{tag}] {title}:{desc}"
seg_list = [Seg(type="text", data=text_content)]
# 下载预览图
if preview_url:
try:
image_base64 = await get_image_base64(preview_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"图文预览图下载失败: {e}")
return seg_list
# 群相册(含预览图)
if app == "com.tencent.feed.lua":
feed = meta.get("feed", {})
title = feed.get("title", "群相册")
tag = feed.get("tagName", "群相册")
desc = feed.get("forwardMessage", "")
cover_url = feed.get("cover", "")
if tag and title and tag in title:
title = title.replace(tag, "", 1).strip(": -— ")
text_content = f"[{tag}] {title}:{desc}"
seg_list = [Seg(type="text", data=text_content)]
# 下载封面图
if cover_url:
try:
image_base64 = await get_image_base64(cover_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"群相册封面下载失败: {e}")
return seg_list
# QQ收藏分享含预览图
if app == "com.tencent.template.qqfavorite.share":
news = meta.get("news", {})
desc = news.get("desc", "").replace("[图片]", "").strip()
tag = news.get("tag", "QQ收藏")
preview_url = news.get("preview", "")
seg_list = [Seg(type="text", data=f"[{tag}] {desc}")]
# 下载预览图
if preview_url:
try:
image_base64 = await get_image_base64(preview_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"QQ收藏预览图下载失败: {e}")
return seg_list
# QQ空间分享含预览图
if app == "com.tencent.miniapp.lua":
miniapp = meta.get("miniapp", {})
title = miniapp.get("title", "未知标题")
tag = miniapp.get("tag", "QQ空间")
preview_url = miniapp.get("preview", "")
seg_list = [Seg(type="text", data=f"[{tag}] {title}")]
# 下载预览图
if preview_url:
try:
image_base64 = await get_image_base64(preview_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"QQ空间预览图下载失败: {e}")
return seg_list
# QQ频道分享含预览图
if app == "com.tencent.forum":
detail = meta.get("detail") if isinstance(meta, dict) else None
if detail:
feed = detail.get("feed", {})
poster = detail.get("poster", {})
channel_info = detail.get("channel_info", {})
guild_name = channel_info.get("guild_name", "")
nick = poster.get("nick", "QQ用户")
title = feed.get("title", {}).get("contents", [{}])[0].get("text_content", {}).get("text", "帖子")
face_content = ""
for item in feed.get("contents", {}).get("contents", []):
emoji = item.get("emoji_content")
if emoji:
eid = emoji.get("id")
if eid in qq_face:
face_content += qq_face.get(eid, "")
seg_list = [Seg(type="text", data=f"[频道帖子] [{guild_name}]{nick}:{title}{face_content}")]
# 下载帖子中的图片
pic_urls = [img.get("pic_url") for img in feed.get("images", []) if img.get("pic_url")]
for pic_url in pic_urls:
try:
image_base64 = await get_image_base64(pic_url)
seg_list.append(Seg(type="image", data=image_base64))
except Exception as e:
logger.error(f"QQ频道图片下载失败: {e}")
return seg_list
# QQ地图位置分享
if app == "com.tencent.map":
location = meta.get("Location.Search", {})
name = location.get("name", "未知地点")
address = location.get("address", "")
return [Seg(type="text", data=f"[位置] {address} · {name}")]
# QQ一起听歌
if app == "com.tencent.together":
invite = (meta or {}).get("invite", {})
title = invite.get("title") or "一起听歌"
summary = invite.get("summary") or ""
return [Seg(type="text", data=f"[{title}] {summary}")]
# 其他卡片消息使用prompt字段
prompt = parsed_json.get("prompt", "[卡片消息]")
return [Seg(type="text", data=prompt)]
except json.JSONDecodeError:
logger.warning("JSON消息解析失败")
return [Seg(type="text", data="[卡片消息]")]
except Exception as e:
logger.error(f"JSON消息处理异常: {e}")
return [Seg(type="text", data="[卡片消息]")]
async def handle_file_message(self, raw_message: dict) -> Seg | None:
"""
处理文件消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
file_name: str = message_data.get("file")
file_size: str = message_data.get("file_size", "未知大小")
file_url: str = message_data.get("url")
if not file_name:
logger.warning("文件消息缺少文件名")
return None
file_text = f"[文件: {file_name}, 大小: {file_size}字节]"
if file_url:
file_text += f"\n文件链接: {file_url}"
return Seg(type="text", data=file_text)
async def handle_reply_message(self, raw_message: dict, additional_config: dict) -> Tuple[List[Seg] | None, dict]: async def handle_reply_message(self, raw_message: dict, additional_config: dict) -> Tuple[List[Seg] | None, dict]:
# sourcery skip: move-assign-in-block, use-named-expression # sourcery skip: move-assign-in-block, use-named-expression
""" """
@ -464,7 +783,7 @@ class MessageHandler:
return None, {} return None, {}
reply_message, _ = await self.handle_real_message(message_detail, in_reply=True) reply_message, _ = await self.handle_real_message(message_detail, in_reply=True)
if reply_message is None: if reply_message is None:
reply_message = "(获取发言内容失败)" reply_message = [Seg(type="text", data="(获取发言内容失败)")]
sender_info: dict = message_detail.get("sender") sender_info: dict = message_detail.get("sender")
sender_nickname: str = sender_info.get("nickname") sender_nickname: str = sender_info.get("nickname")
sender_id: str = sender_info.get("user_id") sender_id: str = sender_info.get("user_id")
@ -489,18 +808,28 @@ class MessageHandler:
image_count: int image_count: int
if not handled_message: if not handled_message:
return None return None
if image_count < 5 and image_count > 0:
# 处理图片数量小于5的情况此时解析图片为base64 # 添加转发消息的标题和结束标识
logger.trace("图片数量小于5开始解析图片为base64") forward_header = Seg(type="text", data="========== 转发消息开始 ==========\n")
return await self._recursive_parse_image_seg(handled_message, True) forward_footer = Seg(type="text", data="========== 转发消息结束 ==========")
# 图片阈值超过此数量使用占位符避免麦麦VLM处理卡死
image_threshold = global_config.forward.image_threshold
if image_count < image_threshold and image_count > 0:
# 处理图片数量小于阈值的情况此时解析图片为base64
logger.trace(f"图片数量({image_count})小于{image_threshold}开始解析图片为base64")
parsed_message = await self._recursive_parse_image_seg(handled_message, True)
return Seg(type="seglist", data=[forward_header, parsed_message, forward_footer])
elif image_count > 0: elif image_count > 0:
logger.trace("图片数量大于等于5开始解析图片为占位符") logger.trace(f"图片数量({image_count})大于等于{image_threshold},开始解析图片为占位符")
# 处理图片数量大于等于5的情况此时解析图片为占位符 # 处理图片数量大于等于阈值的情况,此时解析图片为占位符
return await self._recursive_parse_image_seg(handled_message, False) parsed_message = await self._recursive_parse_image_seg(handled_message, False)
return Seg(type="seglist", data=[forward_header, parsed_message, forward_footer])
else: else:
# 处理没有图片的情况,此时直接返回 # 处理没有图片的情况,此时直接返回
logger.trace("没有图片,直接返回") logger.trace("没有图片,直接返回")
return handled_message return Seg(type="seglist", data=[forward_header, handled_message, forward_footer])
async def _recursive_parse_image_seg(self, seg_data: Seg, to_image: bool) -> Seg: async def _recursive_parse_image_seg(self, seg_data: Seg, to_image: bool) -> Seg:
# sourcery skip: merge-else-if-into-elif # sourcery skip: merge-else-if-into-elif
@ -560,6 +889,8 @@ class MessageHandler:
image_count = 0 image_count = 0
if message_list is None: if message_list is None:
return None, 0 return None, 0
# 统一在最前加入【转发消息】标识(带层级缩进)
seg_list.append(Seg(type="text", data=("--" * layer) + "\n【转发消息】\n"))
for sub_message in message_list: for sub_message in message_list:
sub_message: dict sub_message: dict
sender_info: dict = sub_message.get("sender") sender_info: dict = sub_message.get("sender")
@ -572,12 +903,6 @@ class MessageHandler:
continue continue
message_of_sub_message = message_of_sub_message_list[0] message_of_sub_message = message_of_sub_message_list[0]
if message_of_sub_message.get("type") == RealMessageType.forward: if message_of_sub_message.get("type") == RealMessageType.forward:
if layer >= 3:
full_seg_data = Seg(
type="text",
data=("--" * layer) + f"{user_nickname}】:【转发消息】\n",
)
else:
sub_message_data = message_of_sub_message.get("data") sub_message_data = message_of_sub_message.get("data")
if not sub_message_data: if not sub_message_data:
continue continue
@ -634,6 +959,8 @@ class MessageHandler:
] ]
full_seg_data = Seg(type="seglist", data=data_list) full_seg_data = Seg(type="seglist", data=data_list)
seg_list.append(full_seg_data) seg_list.append(full_seg_data)
# 在结尾追加标识
seg_list.append(Seg(type="text", data=("--" * layer) + "【转发消息结束】"))
return Seg(type="seglist", data=seg_list), image_count return Seg(type="seglist", data=seg_list), image_count
async def _get_forward_message(self, raw_message: dict) -> Dict[str, Any] | None: async def _get_forward_message(self, raw_message: dict) -> Dict[str, Any] | None:

View File

@ -1,8 +1,16 @@
from typing import Dict from typing import Dict
import json
from src.logger import logger from src.logger import logger
from maim_message import MessageBase, Router from maim_message import MessageBase, Router
# 消息大小限制 (字节)
# WebSocket 服务端限制为 100MB这里设置 95MB 留一点余量
MAX_MESSAGE_SIZE_BYTES = 95 * 1024 * 1024 # 95MB
MAX_MESSAGE_SIZE_KB = MAX_MESSAGE_SIZE_BYTES / 1024
MAX_MESSAGE_SIZE_MB = MAX_MESSAGE_SIZE_KB / 1024
class MessageSending: class MessageSending:
""" """
负责把消息发送到麦麦 负责把消息发送到麦麦
@ -20,13 +28,40 @@ class MessageSending:
message_base: MessageBase: 消息基类包含发送目标和消息内容等信息 message_base: MessageBase: 消息基类包含发送目标和消息内容等信息
""" """
try: try:
# 计算消息大小用于调试
msg_dict = message_base.to_dict()
msg_json = json.dumps(msg_dict, ensure_ascii=False)
msg_size_bytes = len(msg_json.encode('utf-8'))
msg_size_kb = msg_size_bytes / 1024
msg_size_mb = msg_size_kb / 1024
logger.debug(f"发送消息大小: {msg_size_kb:.2f} KB")
# 检查消息是否超过大小限制
if msg_size_bytes > MAX_MESSAGE_SIZE_BYTES:
logger.error(
f"消息大小 ({msg_size_mb:.2f} MB) 超过限制 ({MAX_MESSAGE_SIZE_MB:.0f} MB)"
f"消息已被丢弃以避免连接断开"
)
logger.warning(
f"被丢弃的消息来源: platform={message_base.message_info.platform}, "
f"group_id={message_base.message_info.group_info.group_id if message_base.message_info.group_info else 'N/A'}, "
f"user_id={message_base.message_info.user_info.user_id if message_base.message_info.user_info else 'N/A'}"
)
return False
if msg_size_kb > 1024: # 超过 1MB 时警告
logger.warning(f"发送的消息较大 ({msg_size_mb:.2f} MB),可能导致传输延迟")
send_status = await self.maibot_router.send_message(message_base) send_status = await self.maibot_router.send_message(message_base)
if not send_status: if not send_status:
raise RuntimeError("可能是路由未正确配置或连接异常") raise RuntimeError("可能是路由未正确配置或连接异常")
logger.debug("消息发送成功")
return send_status return send_status
except Exception as e: except Exception as e:
logger.error(f"发送消息失败: {str(e)}") logger.error(f"发送消息失败: {str(e)}")
logger.error("请检查与MaiBot之间的连接") logger.error("请检查与MaiBot之间的连接")
return False
async def send_custom_message(self, custom_message: Dict, platform: str, message_type: str) -> bool: async def send_custom_message(self, custom_message: Dict, platform: str, message_type: str) -> bool:
""" """

View File

@ -25,13 +25,25 @@ class MetaEventHandler:
logger.success(f"Bot {self_id} 连接成功") logger.success(f"Bot {self_id} 连接成功")
asyncio.create_task(self.check_heartbeat(self_id)) asyncio.create_task(self.check_heartbeat(self_id))
elif event_type == MetaEventType.heartbeat: elif event_type == MetaEventType.heartbeat:
if message["status"].get("online") and message["status"].get("good"):
if not self._interval_checking:
asyncio.create_task(self.check_heartbeat())
self.last_heart_beat = time.time()
self.interval = message.get("interval") / 1000
else:
self_id = message.get("self_id") self_id = message.get("self_id")
status = message.get("status", {})
is_online = status.get("online", False)
is_good = status.get("good", False)
if is_online and is_good:
# 正常心跳
if not self._interval_checking:
asyncio.create_task(self.check_heartbeat(self_id))
self.last_heart_beat = time.time()
self.interval = message.get("interval", 30000) / 1000
else:
# Bot 离线或状态异常
if not is_online:
logger.error(f"🔴 Bot {self_id} 已下线 (online=false)")
logger.warning("Bot 可能被踢下线、网络断开或主动退出登录")
elif not is_good:
logger.warning(f"⚠️ Bot {self_id} 状态异常 (good=false)")
else:
logger.warning(f"Bot {self_id} Napcat 端异常!") logger.warning(f"Bot {self_id} Napcat 端异常!")
async def check_heartbeat(self, id: int) -> None: async def check_heartbeat(self, id: int) -> None:

View File

@ -10,6 +10,7 @@ from src.database import BanUser, db_manager, is_identical
from . import NoticeType, ACCEPT_FORMAT from . import NoticeType, ACCEPT_FORMAT
from .message_sending import message_send_instance from .message_sending import message_send_instance
from .message_handler import message_handler from .message_handler import message_handler
from .qq_emoji_list import qq_face
from maim_message import FormatInfo, UserInfo, GroupInfo, Seg, BaseMessageInfo, MessageBase from maim_message import FormatInfo, UserInfo, GroupInfo, Seg, BaseMessageInfo, MessageBase
from src.utils import ( from src.utils import (
@ -87,12 +88,13 @@ class NoticeHandler:
match notice_type: match notice_type:
case NoticeType.friend_recall: case NoticeType.friend_recall:
logger.info("好友撤回一条消息") logger.info("好友撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}") handled_message, user_info = await self.handle_friend_recall_notify(raw_message)
logger.warning("暂时不支持撤回消息处理")
case NoticeType.group_recall: case NoticeType.group_recall:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("群内用户撤回一条消息") logger.info("群内用户撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}") handled_message, user_info = await self.handle_group_recall_notify(raw_message, group_id, user_id)
logger.warning("暂时不支持撤回消息处理") system_notice = True
case NoticeType.notify: case NoticeType.notify:
sub_type = raw_message.get("sub_type") sub_type = raw_message.get("sub_type")
match sub_type: match sub_type:
@ -104,6 +106,12 @@ class NoticeHandler:
handled_message, user_info = await self.handle_poke_notify(raw_message, group_id, user_id) handled_message, user_info = await self.handle_poke_notify(raw_message, group_id, user_id)
else: else:
logger.warning("戳一戳消息被禁用,取消戳一戳处理") logger.warning("戳一戳消息被禁用,取消戳一戳处理")
case NoticeType.Notify.group_name:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("处理群名称变更")
handled_message, user_info = await self.handle_group_name_notify(raw_message, group_id, user_id)
system_notice = True
case _: case _:
logger.warning(f"不支持的notify类型: {notice_type}.{sub_type}") logger.warning(f"不支持的notify类型: {notice_type}.{sub_type}")
case NoticeType.group_ban: case NoticeType.group_ban:
@ -123,6 +131,45 @@ class NoticeHandler:
system_notice = True system_notice = True
case _: case _:
logger.warning(f"不支持的group_ban类型: {notice_type}.{sub_type}") logger.warning(f"不支持的group_ban类型: {notice_type}.{sub_type}")
case NoticeType.group_msg_emoji_like:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("处理群消息表情回应")
handled_message, user_info = await self.handle_emoji_like_notify(raw_message, group_id, user_id)
case NoticeType.group_upload:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("处理群文件上传")
handled_message, user_info = await self.handle_group_upload_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_increase:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
sub_type = raw_message.get("sub_type")
logger.info(f"处理群成员增加: {sub_type}")
handled_message, user_info = await self.handle_group_increase_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_decrease:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
sub_type = raw_message.get("sub_type")
logger.info(f"处理群成员减少: {sub_type}")
handled_message, user_info = await self.handle_group_decrease_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_admin:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
sub_type = raw_message.get("sub_type")
logger.info(f"处理群管理员变动: {sub_type}")
handled_message, user_info = await self.handle_group_admin_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.essence:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
sub_type = raw_message.get("sub_type")
logger.info(f"处理精华消息: {sub_type}")
handled_message, user_info = await self.handle_essence_notify(raw_message, group_id)
system_notice = True
case _: case _:
logger.warning(f"不支持的notice类型: {notice_type}") logger.warning(f"不支持的notice类型: {notice_type}")
return None return None
@ -240,6 +287,150 @@ class NoticeHandler:
) )
return seg_data, user_info return seg_data, user_info
async def handle_friend_recall_notify(self, raw_message: dict) -> Tuple[Seg | None, UserInfo | None]:
"""处理好友消息撤回"""
user_id = raw_message.get("user_id")
message_id = raw_message.get("message_id")
if not user_id:
logger.error("用户ID不能为空无法处理好友撤回通知")
return None, None
# 获取好友信息
user_qq_info: dict = await get_stranger_info(self.server_connection, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
else:
user_name = "QQ用户"
logger.warning("无法获取撤回消息好友的昵称")
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=None,
)
seg_data = Seg(
type="notify",
data={
"sub_type": "friend_recall",
"message_id": message_id,
},
)
return seg_data, user_info
async def handle_group_recall_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""处理群消息撤回"""
if not group_id:
logger.error("群ID不能为空无法处理群撤回通知")
return None, None
message_id = raw_message.get("message_id")
operator_id = raw_message.get("operator_id")
# 获取撤回操作者信息
operator_nickname: str = None
operator_cardname: str = None
member_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if member_info:
operator_nickname = member_info.get("nickname")
operator_cardname = member_info.get("card")
else:
logger.warning("无法获取撤回操作者的昵称")
operator_nickname = "QQ用户"
operator_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=operator_id,
user_nickname=operator_nickname,
user_cardname=operator_cardname,
)
# 获取被撤回消息发送者信息(如果不是自己撤回的话)
recalled_user_info: UserInfo | None = None
if user_id != operator_id:
user_member_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_member_info:
user_nickname = user_member_info.get("nickname")
user_cardname = user_member_info.get("card")
else:
user_nickname = "QQ用户"
user_cardname = None
logger.warning("无法获取被撤回消息发送者的昵称")
recalled_user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
)
seg_data = Seg(
type="notify",
data={
"sub_type": "group_recall",
"message_id": message_id,
"recalled_user_info": recalled_user_info.to_dict() if recalled_user_info else None,
},
)
return seg_data, operator_info
async def handle_emoji_like_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""处理群消息表情回应"""
if not group_id:
logger.error("群ID不能为空无法处理表情回应通知")
return None, None
# 获取用户信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
user_name = "QQ用户"
user_cardname = "QQ用户"
logger.warning("无法获取表情回应用户的昵称")
# 解析表情列表
likes = raw_message.get("likes", [])
message_id = raw_message.get("message_id")
# 构建表情文本,直接使用 qq_face 映射
emoji_texts = []
for like in likes:
emoji_id = str(like.get("emoji_id", ""))
count = like.get("count", 1)
# 使用 qq_face 字典获取表情描述
emoji = qq_face.get(emoji_id, f"[表情:未知{emoji_id}]")
if count > 1:
emoji_texts.append(f"{emoji}x{count}")
else:
emoji_texts.append(emoji)
emoji_str = "".join(emoji_texts) if emoji_texts else "未知表情"
display_name = user_cardname if user_cardname and user_cardname != "QQ用户" else user_name
# 构建消息文本
message_text = f"{display_name} 对消息(ID:{message_id})表达了 {emoji_str}"
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
seg_data = Seg(type="text", data=message_text)
return seg_data, user_info
async def handle_ban_notify(self, raw_message: dict, group_id: int) -> Tuple[Seg, UserInfo] | Tuple[None, None]: async def handle_ban_notify(self, raw_message: dict, group_id: int) -> Tuple[Seg, UserInfo] | Tuple[None, None]:
if not group_id: if not group_id:
logger.error("群ID不能为空无法处理禁言通知") logger.error("群ID不能为空无法处理禁言通知")
@ -512,5 +703,298 @@ class NoticeHandler:
await unsuccessful_notice_queue.put(to_be_send) await unsuccessful_notice_queue.put(to_be_send)
await asyncio.sleep(1) await asyncio.sleep(1)
async def handle_group_upload_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群文件上传通知
"""
file_info: dict = raw_message.get("file", {})
file_name = file_info.get("name", "未知文件")
file_size = file_info.get("size", 0)
file_id = file_info.get("id", "")
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取上传者信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 格式化文件大小
if file_size < 1024:
size_str = f"{file_size}B"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.2f}KB"
else:
size_str = f"{file_size / (1024 * 1024):.2f}MB"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_upload",
"file_name": file_name,
"file_size": size_str,
"file_id": file_id,
},
)
return notify_seg, user_info
async def handle_group_increase_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群成员增加通知
"""
sub_type = raw_message.get("sub_type")
operator_id = raw_message.get("operator_id")
# 获取新成员信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取新成员信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 获取操作者信息
operator_name = "未知"
if operator_id:
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("card") or operator_info.get("nickname", "未知")
if sub_type == NoticeType.GroupIncrease.invite:
action_text = f"{operator_name} 邀请"
elif sub_type == NoticeType.GroupIncrease.approve:
action_text = f"{operator_name} 同意"
else:
action_text = "加入"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_increase",
"action": action_text,
"increase_type": sub_type,
"operator_id": operator_id,
},
)
return notify_seg, user_info
async def handle_group_decrease_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群成员减少通知
"""
sub_type = raw_message.get("sub_type")
operator_id = raw_message.get("operator_id")
# 获取离开成员信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取离开成员信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 获取操作者信息
operator_name = "未知"
if operator_id and operator_id != 0:
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("card") or operator_info.get("nickname", "未知")
if sub_type == NoticeType.GroupDecrease.leave:
action_text = "主动退群"
elif sub_type == NoticeType.GroupDecrease.kick:
action_text = f"{operator_name} 踢出"
elif sub_type == NoticeType.GroupDecrease.kick_me:
action_text = "机器人被踢出"
else:
action_text = "离开群聊"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_decrease",
"action": action_text,
"decrease_type": sub_type,
"operator_id": operator_id,
},
)
return notify_seg, user_info
async def handle_group_admin_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群管理员变动通知
"""
sub_type = raw_message.get("sub_type")
# 获取目标用户信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取目标用户信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
if sub_type == NoticeType.GroupAdmin.set:
action_text = "被设置为管理员"
elif sub_type == NoticeType.GroupAdmin.unset:
action_text = "被取消管理员"
else:
action_text = "管理员变动"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_admin",
"action": action_text,
"admin_type": sub_type,
},
)
return notify_seg, user_info
async def handle_essence_notify(
self, raw_message: dict, group_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理精华消息通知
"""
sub_type = raw_message.get("sub_type")
sender_id = raw_message.get("sender_id")
operator_id = raw_message.get("operator_id")
message_id = raw_message.get("message_id")
# 获取操作者信息(设置精华的人)
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("nickname")
operator_cardname = operator_info.get("card")
else:
logger.warning("无法获取操作者信息")
operator_name = "QQ用户"
operator_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=operator_id,
user_nickname=operator_name,
user_cardname=operator_cardname,
)
# 获取消息发送者信息
sender_name = "未知用户"
if sender_id:
sender_info: dict = await get_member_info(self.server_connection, group_id, sender_id)
if sender_info:
sender_name = sender_info.get("card") or sender_info.get("nickname", "未知用户")
if sub_type == NoticeType.Essence.add:
action_text = f"{sender_name} 的消息设为精华"
elif sub_type == NoticeType.Essence.delete:
action_text = f"移除了 {sender_name} 的精华消息"
else:
action_text = "精华消息变动"
notify_seg = Seg(
type="notify",
data={
"sub_type": "essence",
"action": action_text,
"essence_type": sub_type,
"sender_id": sender_id,
"message_id": message_id,
},
)
return notify_seg, user_info
async def handle_group_name_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群名称变更通知
"""
new_name = raw_message.get("name_new")
if not new_name:
logger.warning("群名称变更通知缺少新名称")
return None, None
# 获取操作者信息
user_info_dict: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_info_dict:
user_name = user_info_dict.get("nickname")
user_cardname = user_info_dict.get("card")
else:
logger.warning("无法获取修改群名称的用户信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
action_text = f"修改群名称为: {new_name}"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_name",
"action": action_text,
"new_name": new_name,
},
)
return notify_seg, user_info
notice_handler = NoticeHandler() notice_handler = NoticeHandler()

View File

@ -31,7 +31,7 @@ qq_face: dict = {
"30": "[表情:奋斗]", "30": "[表情:奋斗]",
"31": "[表情:咒骂]", "31": "[表情:咒骂]",
"32": "[表情:疑问]", "32": "[表情:疑问]",
"33": "[表情: 嘘]", "33": "[表情:嘘]",
"34": "[表情:晕]", "34": "[表情:晕]",
"35": "[表情:折磨]", "35": "[表情:折磨]",
"36": "[表情:衰]", "36": "[表情:衰]",
@ -117,7 +117,7 @@ qq_face: dict = {
"268": "[表情:问号脸]", "268": "[表情:问号脸]",
"269": "[表情:暗中观察]", "269": "[表情:暗中观察]",
"270": "[表情emm]", "270": "[表情emm]",
"271": "[表情:吃 瓜]", "271": "[表情:吃瓜]",
"272": "[表情:呵呵哒]", "272": "[表情:呵呵哒]",
"273": "[表情:我酸了]", "273": "[表情:我酸了]",
"277": "[表情:汪汪]", "277": "[表情:汪汪]",
@ -146,7 +146,7 @@ qq_face: dict = {
"314": "[表情:仔细分析]", "314": "[表情:仔细分析]",
"317": "[表情:菜汪]", "317": "[表情:菜汪]",
"318": "[表情:崇拜]", "318": "[表情:崇拜]",
"319": "[表情: 比心]", "319": "[表情:比心]",
"320": "[表情:庆祝]", "320": "[表情:庆祝]",
"323": "[表情:嫌弃]", "323": "[表情:嫌弃]",
"324": "[表情:吃糖]", "324": "[表情:吃糖]",
@ -175,13 +175,65 @@ qq_face: dict = {
"355": "[表情:耶]", "355": "[表情:耶]",
"356": "[表情666]", "356": "[表情666]",
"357": "[表情:裂开]", "357": "[表情:裂开]",
"392": "[表情:龙年 快乐]", "392": "[表情:龙年快乐]",
"393": "[表情:新年中龙]", "393": "[表情:新年中龙]",
"394": "[表情:新年大龙]", "394": "[表情:新年大龙]",
"395": "[表情:略略略]", "395": "[表情:略略略]",
"128522": "[表情:嘿嘿]",
"128524": "[表情:羞涩]",
"128538": "[表情:亲亲]",
"128531": "[表情:汗]",
"128560": "[表情:紧张]",
"128541": "[表情:吐舌]",
"128513": "[表情:呲牙]",
"128540": "[表情:淘气]",
"9786": "[表情:可爱]",
"128532": "[表情:失落]",
"128516": "[表情:高兴]",
"128527": "[表情:哼哼]",
"128530": "[表情:不屑]",
"128563": "[表情:瞪眼]",
"128536": "[表情:飞吻]",
"128557": "[表情:大哭]",
"128514": "[表情:激动]",
"128170": "[表情:肌肉]",
"128074": "[表情:拳头]",
"128077": "[表情:厉害]",
"128079": "[表情:鼓掌]",
"128076": "[表情:好的]",
"127836": "[表情:拉面]",
"127847": "[表情:刨冰]",
"127838": "[表情:面包]",
"127866": "[表情:啤酒]",
"127867": "[表情:干杯]",
"9749": "[表情:咖啡]",
"127822": "[表情:苹果]",
"127827": "[表情:草莓]",
"127817": "[表情:西瓜]",
"127801": "[表情:玫瑰]",
"127881": "[表情:庆祝]",
"128157": "[表情:礼物]",
"10024": "[表情:闪光]",
"128168": "[表情:吹气]",
"128166": "[表情:水]",
"128293": "[表情:火]",
"128164": "[表情:睡觉]",
"128235": "[表情:邮箱]",
"128103": "[表情:女孩]",
"128102": "[表情:男孩]",
"128053": "[表情:猴]",
"128046": "[表情:牛]",
"128027": "[表情:虫]",
"128051": "[表情:鲸鱼]",
"9728": "[表情:晴天]",
"10068": "[表情:问号]",
"128147": "[表情:爱心]",
"10060": "[表情:错误]",
"128089": "[表情:内衣]",
"128104": "[表情:爸爸]",
"😊": "[表情:嘿嘿]", "😊": "[表情:嘿嘿]",
"😌": "[表情:羞涩]", "😌": "[表情:羞涩]",
"😚": "[ 表情:亲亲]", "😚": "[表情:亲亲]",
"😓": "[表情:汗]", "😓": "[表情:汗]",
"😰": "[表情:紧张]", "😰": "[表情:紧张]",
"😝": "[表情:吐舌]", "😝": "[表情:吐舌]",
@ -200,7 +252,7 @@ qq_face: dict = {
"😂": "[表情:激动]", "😂": "[表情:激动]",
"💪": "[表情:肌肉]", "💪": "[表情:肌肉]",
"👊": "[表情:拳头]", "👊": "[表情:拳头]",
"👍": "[表情 :厉害]", "👍": "[表情:厉害]",
"👏": "[表情:鼓掌]", "👏": "[表情:鼓掌]",
"👎": "[表情:鄙视]", "👎": "[表情:鄙视]",
"🙏": "[表情:合十]", "🙏": "[表情:合十]",
@ -245,6 +297,6 @@ qq_face: dict = {
"": "[表情:晴天]", "": "[表情:晴天]",
"": "[表情:问号]", "": "[表情:问号]",
"🔫": "[表情:手枪]", "🔫": "[表情:手枪]",
"💓": "[表情:爱 心]", "💓": "[表情:爱心]",
"🏪": "[表情:便利店]", "🏪": "[表情:便利店]",
} }

View File

@ -1,4 +1,5 @@
from typing import Any, Dict from typing import Any, Dict, Optional
import time
from maim_message import ( from maim_message import (
UserInfo, UserInfo,
GroupInfo, GroupInfo,
@ -10,6 +11,7 @@ from src.logger import logger
from .send_command_handler import SendCommandHandleClass from .send_command_handler import SendCommandHandleClass
from .send_message_handler import SendMessageHandleClass from .send_message_handler import SendMessageHandleClass
from .nc_sending import nc_message_sender from .nc_sending import nc_message_sender
from src.recv_handler.message_sending import message_send_instance
class SendHandler: class SendHandler:
@ -34,21 +36,89 @@ class SendHandler:
message_segment: Seg = raw_message_base.message_segment message_segment: Seg = raw_message_base.message_segment
group_info: GroupInfo = message_info.group_info group_info: GroupInfo = message_info.group_info
seg_data: Dict[str, Any] = message_segment.data seg_data: Dict[str, Any] = message_segment.data
command_name = seg_data.get('name', 'UNKNOWN')
try: try:
command, args_dict = SendCommandHandleClass.handle_command(seg_data, group_info) command, args_dict = SendCommandHandleClass.handle_command(seg_data, group_info)
except Exception as e: except Exception as e:
logger.error(f"处理命令时出错: {str(e)}") logger.error(f"处理命令时出错: {str(e)}")
# 发送错误响应给麦麦
await self._send_command_response(
platform=message_info.platform,
command_name=command_name,
success=False,
error=str(e)
)
return return
if not command or not args_dict: if not command or not args_dict:
logger.error("命令或参数缺失") logger.error("命令或参数缺失")
await self._send_command_response(
platform=message_info.platform,
command_name=command_name,
success=False,
error="命令或参数缺失"
)
return None return None
response = await nc_message_sender.send_message_to_napcat(command, args_dict) response = await nc_message_sender.send_message_to_napcat(command, args_dict)
# 根据响应状态发送结果给麦麦
if response.get("status") == "ok": if response.get("status") == "ok":
logger.info(f"命令 {seg_data.get('name')} 执行成功") logger.info(f"命令 {command_name} 执行成功")
await self._send_command_response(
platform=message_info.platform,
command_name=command_name,
success=True,
data=response.get("data")
)
else: else:
logger.warning(f"命令 {seg_data.get('name')} 执行失败napcat返回{str(response)}") logger.warning(f"命令 {command_name} 执行失败napcat返回{str(response)}")
await self._send_command_response(
platform=message_info.platform,
command_name=command_name,
success=False,
error=str(response),
data=response.get("data") # 有些错误响应也可能包含部分数据
)
async def _send_command_response(
self,
platform: str,
command_name: str,
success: bool,
data: Optional[Dict] = None,
error: Optional[str] = None
) -> None:
"""发送命令响应回麦麦
Args:
platform: 平台标识
command_name: 命令名称
success: 是否执行成功
data: 返回数据成功时
error: 错误信息失败时
"""
response_data = {
"command_name": command_name,
"success": success,
"timestamp": time.time()
}
if data is not None:
response_data["data"] = data
if error:
response_data["error"] = error
try:
await message_send_instance.send_custom_message(
custom_message=response_data,
platform=platform,
message_type="command_response"
)
logger.debug(f"已发送命令响应: {command_name}, success={success}")
except Exception as e:
logger.error(f"发送命令响应失败: {e}")
async def send_normal_message(self, raw_message_base: MessageBase) -> None: async def send_normal_message(self, raw_message_base: MessageBase) -> None:
""" """

View File

@ -1,44 +1,83 @@
from maim_message import GroupInfo from maim_message import GroupInfo
from typing import Any, Dict, Tuple from typing import Any, Dict, Tuple, Callable, Optional
from src import CommandType from src import CommandType
# 全局命令处理器注册表(在类外部定义以避免循环引用)
_command_handlers: Dict[str, Dict[str, Any]] = {}
def register_command(command_type: CommandType, require_group: bool = True):
"""装饰器:注册命令处理器
Args:
command_type: 命令类型
require_group: 是否需要群聊信息默认为True
Returns:
装饰器函数
"""
def decorator(func: Callable) -> Callable:
_command_handlers[command_type.name] = {
"handler": func,
"require_group": require_group,
}
return func
return decorator
class SendCommandHandleClass: class SendCommandHandleClass:
@classmethod @classmethod
def handle_command(cls, raw_command_data: Dict[str, Any], group_info: GroupInfo): def handle_command(cls, raw_command_data: Dict[str, Any], group_info: Optional[GroupInfo]):
"""统一命令处理入口
Args:
raw_command_data: 原始命令数据
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params) 用于发送给NapCat
Raises:
RuntimeError: 命令类型未知或处理失败
"""
command_name: str = raw_command_data.get("name") command_name: str = raw_command_data.get("name")
try:
match command_name: if command_name not in _command_handlers:
case CommandType.GROUP_BAN.name:
return cls.handle_ban_command(raw_command_data.get("args", {}), group_info)
case CommandType.GROUP_WHOLE_BAN.name:
return cls.handle_whole_ban_command(raw_command_data.get("args", {}), group_info)
case CommandType.GROUP_KICK.name:
return cls.handle_kick_command(raw_command_data.get("args", {}), group_info)
case CommandType.SEND_POKE.name:
return cls.handle_poke_command(raw_command_data.get("args", {}), group_info)
case CommandType.DELETE_MSG.name:
return cls.delete_msg_command(raw_command_data.get("args", {}))
case CommandType.AI_VOICE_SEND.name:
return cls.handle_ai_voice_send_command(raw_command_data.get("args", {}), group_info)
case CommandType.MESSAGE_LIKE.name:
return cls.handle_message_like_command(raw_command_data.get("args", {}))
case _:
raise RuntimeError(f"未知的命令类型: {command_name}") raise RuntimeError(f"未知的命令类型: {command_name}")
try:
handler_info = _command_handlers[command_name]
handler = handler_info["handler"]
require_group = handler_info["require_group"]
# 检查群聊信息要求
if require_group and not group_info:
raise ValueError(f"命令 {command_name} 需要在群聊上下文中使用")
# 调用处理器
args = raw_command_data.get("args", {})
return handler(args, group_info)
except Exception as e: except Exception as e:
raise RuntimeError(f"处理命令时出错: {str(e)}") from e raise RuntimeError(f"处理命令 {command_name} 时出错: {str(e)}") from e
# ============ 命令处理器(使用装饰器注册)============
@staticmethod @staticmethod
def handle_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_BAN, require_group=True)
def handle_ban_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理封禁命令 """处理封禁命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"qq_id": int, "duration": int}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息对应目标群聊
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
duration: int = int(args["duration"]) duration: int = int(args["duration"])
user_id: int = int(args["qq_id"]) user_id: int = int(args["qq_id"])
@ -59,15 +98,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_whole_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_WHOLE_BAN, require_group=True)
def handle_whole_ban_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理全体禁言命令 """处理全体禁言命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"enable": bool}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息对应目标群聊
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
enable = args["enable"] enable = args["enable"]
assert isinstance(enable, bool), "enable参数必须是布尔值" assert isinstance(enable, bool), "enable参数必须是布尔值"
@ -83,41 +123,122 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_kick_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_KICK, require_group=False)
def handle_kick_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理群成员踢出命令 """处理群成员踢出命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"group_id": int, "user_id": int, "reject_add_request": bool (可选)}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息可选可自动获取 group_id
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
user_id: int = int(args["qq_id"]) if not args:
group_id: int = int(group_info.group_id) raise ValueError("群踢人命令缺少参数")
# 优先从 args 获取 group_id否则从 group_info 获取
group_id = args.get("group_id")
if not group_id and group_info:
group_id = int(group_info.group_id)
user_id = args.get("user_id")
if not group_id:
raise ValueError("群踢人命令缺少必要参数: group_id")
if not user_id:
raise ValueError("群踢人命令缺少必要参数: user_id")
group_id = int(group_id)
user_id = int(user_id)
if group_id <= 0: if group_id <= 0:
raise ValueError("群组ID无效") raise ValueError("群组ID无效")
if user_id <= 0: if user_id <= 0:
raise ValueError("用户ID无效") raise ValueError("用户ID无效")
# reject_add_request 是可选参数,默认 False
reject_add_request = args.get("reject_add_request", False)
return ( return (
CommandType.GROUP_KICK.value, CommandType.GROUP_KICK.value,
{ {
"group_id": group_id, "group_id": group_id,
"user_id": user_id, "user_id": user_id,
"reject_add_request": False, # 不拒绝加群请求 "reject_add_request": bool(reject_add_request),
}, },
) )
@staticmethod @staticmethod
def handle_poke_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_KICK_MEMBERS, require_group=False)
def handle_kick_members_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理批量踢出群成员命令
Args:
args: 参数字典 {"group_id": int, "user_id": List[int], "reject_add_request": bool (可选)}
group_info: 群聊信息可选可自动获取 group_id
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("批量踢人命令缺少参数")
# 优先从 args 获取 group_id否则从 group_info 获取
group_id = args.get("group_id")
if not group_id and group_info:
group_id = int(group_info.group_id)
user_id = args.get("user_id")
if not group_id:
raise ValueError("批量踢人命令缺少必要参数: group_id")
if not user_id:
raise ValueError("批量踢人命令缺少必要参数: user_id")
# 验证 user_id 是数组
if not isinstance(user_id, list):
raise ValueError("user_id 必须是数组类型")
if len(user_id) == 0:
raise ValueError("user_id 数组不能为空")
# 转换并验证每个 user_id
user_id_list = []
for uid in user_id:
try:
uid_int = int(uid)
if uid_int <= 0:
raise ValueError(f"用户ID无效: {uid}")
user_id_list.append(uid_int)
except (ValueError, TypeError) as e:
raise ValueError(f"用户ID格式错误: {uid} - {str(e)}") from None
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
# reject_add_request 是可选参数,默认 False
reject_add_request = args.get("reject_add_request", False)
return (
CommandType.GROUP_KICK_MEMBERS.value,
{
"group_id": group_id,
"user_id": user_id_list,
"reject_add_request": bool(reject_add_request),
},
)
@staticmethod
@register_command(CommandType.SEND_POKE, require_group=False)
def handle_poke_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理戳一戳命令 """处理戳一戳命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"qq_id": int}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息可选私聊时为None
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
user_id: int = int(args["qq_id"]) user_id: int = int(args["qq_id"])
if group_info is None: if group_info is None:
@ -137,14 +258,55 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def delete_msg_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.SET_GROUP_NAME, require_group=False)
def handle_set_group_name_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""设置群名
Args:
args: 参数字典 {"group_id": int, "group_name": str}
group_info: 群聊信息可选可自动获取 group_id
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("设置群名命令缺少参数")
# 优先从 args 获取 group_id否则从 group_info 获取
group_id = args.get("group_id")
if not group_id and group_info:
group_id = int(group_info.group_id)
group_name = args.get("group_name")
if not group_id:
raise ValueError("设置群名命令缺少必要参数: group_id")
if not group_name:
raise ValueError("设置群名命令缺少必要参数: group_name")
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.SET_GROUP_NAME.value,
{
"group_id": group_id,
"group_name": str(group_name),
},
)
@staticmethod
@register_command(CommandType.DELETE_MSG, require_group=False)
def delete_msg_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理撤回消息命令 """处理撤回消息命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"message_id": int}
group_info: 群聊信息不使用
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
try: try:
message_id = int(args["message_id"]) message_id = int(args["message_id"])
@ -155,18 +317,52 @@ class SendCommandHandleClass:
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
raise ValueError(f"消息ID无效: {args['message_id']} - {str(e)}") from None raise ValueError(f"消息ID无效: {args['message_id']} - {str(e)}") from None
return ( return (CommandType.DELETE_MSG.value, {"message_id": message_id})
CommandType.DELETE_MSG.value,
{
"message_id": message_id,
},
)
@staticmethod @staticmethod
def handle_ai_voice_send_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.SET_QQ_PROFILE, require_group=False)
def handle_set_qq_profile_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""设置账号信息
Args:
args: 参数字典 {"nickname": str, "personal_note": str (可选), "sex": str (可选)}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
""" """
处理AI语音发送命令的逻辑 if not args:
并返回 NapCat 兼容的 (action, params) 元组 raise ValueError("设置账号信息命令缺少参数")
nickname = args.get("nickname")
if not nickname:
raise ValueError("设置账号信息命令缺少必要参数: nickname")
params = {"nickname": str(nickname)}
# 可选参数
if "personal_note" in args:
params["personal_note"] = str(args["personal_note"])
if "sex" in args:
sex = str(args["sex"]).lower()
if sex not in ["male", "female", "unknown"]:
raise ValueError(f"性别参数无效: {sex},必须为 male/female/unknown 之一")
params["sex"] = sex
return (CommandType.SET_QQ_PROFILE.value, params)
@staticmethod
@register_command(CommandType.AI_VOICE_SEND, require_group=True)
def handle_ai_voice_send_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理AI语音发送命令
Args:
args: 参数字典 {"character": str, "text": str}
group_info: 群聊信息
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
""" """
if not group_info or not group_info.group_id: if not group_info or not group_info.group_id:
raise ValueError("AI语音发送命令必须在群聊上下文中使用") raise ValueError("AI语音发送命令必须在群聊上下文中使用")
@ -190,9 +386,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_message_like_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.MESSAGE_LIKE, require_group=False)
""" def handle_message_like_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
处理给消息贴表情的逻辑 """处理给消息贴表情命令
Args:
args: 参数字典 {"message_id": int, "emoji_id": int}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
""" """
if not args: if not args:
raise ValueError("消息贴表情命令缺少参数") raise ValueError("消息贴表情命令缺少参数")
@ -219,3 +422,298 @@ class SendCommandHandleClass:
"set": True, "set": True,
}, },
) )
# ============ 查询类命令处理器 ============
@staticmethod
@register_command(CommandType.GET_LOGIN_INFO, require_group=False)
def handle_get_login_info_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取登录号信息Bot自身信息
Args:
args: 参数字典无需参数
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
return (CommandType.GET_LOGIN_INFO.value, {})
@staticmethod
@register_command(CommandType.GET_STRANGER_INFO, require_group=False)
def handle_get_stranger_info_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取陌生人信息
Args:
args: 参数字典 {"user_id": int}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("获取陌生人信息命令缺少参数")
user_id = args.get("user_id")
if not user_id:
raise ValueError("获取陌生人信息命令缺少必要参数: user_id")
user_id = int(user_id)
if user_id <= 0:
raise ValueError("用户ID无效")
return (
CommandType.GET_STRANGER_INFO.value,
{"user_id": user_id},
)
@staticmethod
@register_command(CommandType.GET_FRIEND_LIST, require_group=False)
def handle_get_friend_list_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取好友列表
Args:
args: 参数字典 {"no_cache": bool} (可选默认 false)
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# no_cache 参数是可选的,默认为 false
no_cache = args.get("no_cache", False) if args else False
return (CommandType.GET_FRIEND_LIST.value, {"no_cache": bool(no_cache)})
@staticmethod
@register_command(CommandType.GET_GROUP_INFO, require_group=False)
def handle_get_group_info_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群信息
Args:
args: 参数字典 {"group_id": int} 或从 group_info 自动获取
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# 优先从 args 获取,否则从 group_info 获取
group_id = args.get("group_id") if args else None
if not group_id and group_info:
group_id = int(group_info.group_id)
if not group_id:
raise ValueError("获取群信息命令缺少必要参数: group_id")
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.GET_GROUP_INFO.value,
{"group_id": group_id},
)
@staticmethod
@register_command(CommandType.GET_GROUP_DETAIL_INFO, require_group=False)
def handle_get_group_detail_info_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群详细信息
Args:
args: 参数字典 {"group_id": int} 或从 group_info 自动获取
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# 优先从 args 获取,否则从 group_info 获取
group_id = args.get("group_id") if args else None
if not group_id and group_info:
group_id = int(group_info.group_id)
if not group_id:
raise ValueError("获取群详细信息命令缺少必要参数: group_id")
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.GET_GROUP_DETAIL_INFO.value,
{"group_id": group_id},
)
@staticmethod
@register_command(CommandType.GET_GROUP_LIST, require_group=False)
def handle_get_group_list_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群列表
Args:
args: 参数字典 {"no_cache": bool} (可选默认 false)
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# no_cache 参数是可选的,默认为 false
no_cache = args.get("no_cache", False) if args else False
return (CommandType.GET_GROUP_LIST.value, {"no_cache": bool(no_cache)})
@staticmethod
@register_command(CommandType.GET_GROUP_AT_ALL_REMAIN, require_group=False)
def handle_get_group_at_all_remain_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群@全体成员剩余次数
Args:
args: 参数字典 {"group_id": int} 或从 group_info 自动获取
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# 优先从 args 获取,否则从 group_info 获取
group_id = args.get("group_id") if args else None
if not group_id and group_info:
group_id = int(group_info.group_id)
if not group_id:
raise ValueError("获取群@全体成员剩余次数命令缺少必要参数: group_id")
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.GET_GROUP_AT_ALL_REMAIN.value,
{"group_id": group_id},
)
@staticmethod
@register_command(CommandType.GET_GROUP_MEMBER_INFO, require_group=False)
def handle_get_group_member_info_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群成员信息
Args:
args: 参数字典 {"group_id": int, "user_id": int, "no_cache": bool} group_id group_info 自动获取
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("获取群成员信息命令缺少参数")
# 优先从 args 获取,否则从 group_info 获取
group_id = args.get("group_id")
if not group_id and group_info:
group_id = int(group_info.group_id)
user_id = args.get("user_id")
no_cache = args.get("no_cache", False)
if not group_id:
raise ValueError("获取群成员信息命令缺少必要参数: group_id")
if not user_id:
raise ValueError("获取群成员信息命令缺少必要参数: user_id")
group_id = int(group_id)
user_id = int(user_id)
if group_id <= 0:
raise ValueError("群组ID无效")
if user_id <= 0:
raise ValueError("用户ID无效")
return (
CommandType.GET_GROUP_MEMBER_INFO.value,
{
"group_id": group_id,
"user_id": user_id,
"no_cache": bool(no_cache),
},
)
@staticmethod
@register_command(CommandType.GET_GROUP_MEMBER_LIST, require_group=False)
def handle_get_group_member_list_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取群成员列表
Args:
args: 参数字典 {"group_id": int, "no_cache": bool} group_id group_info 自动获取
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
# 优先从 args 获取,否则从 group_info 获取
group_id = args.get("group_id") if args else None
if not group_id and group_info:
group_id = int(group_info.group_id)
no_cache = args.get("no_cache", False) if args else False
if not group_id:
raise ValueError("获取群成员列表命令缺少必要参数: group_id")
group_id = int(group_id)
if group_id <= 0:
raise ValueError("群组ID无效")
return (
CommandType.GET_GROUP_MEMBER_LIST.value,
{
"group_id": group_id,
"no_cache": bool(no_cache),
},
)
@staticmethod
@register_command(CommandType.GET_MSG, require_group=False)
def handle_get_msg_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取消息详情
Args:
args: 参数字典 {"message_id": int}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("获取消息命令缺少参数")
message_id = args.get("message_id")
if not message_id:
raise ValueError("获取消息命令缺少必要参数: message_id")
message_id = int(message_id)
if message_id <= 0:
raise ValueError("消息ID无效")
return (
CommandType.GET_MSG.value,
{"message_id": message_id},
)
@staticmethod
@register_command(CommandType.GET_FORWARD_MSG, require_group=False)
def handle_get_forward_msg_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""获取合并转发消息
Args:
args: 参数字典 {"message_id": str}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
"""
if not args:
raise ValueError("获取合并转发消息命令缺少参数")
message_id = args.get("message_id")
if not message_id:
raise ValueError("获取合并转发消息命令缺少必要参数: message_id")
return (
CommandType.GET_FORWARD_MSG.value,
{"message_id": str(message_id)},
)

View File

@ -54,8 +54,8 @@ class SendMessageHandleClass:
voice_url = seg.data voice_url = seg.data
new_payload = cls.build_payload(payload, cls.handle_voiceurl_message(voice_url), False) new_payload = cls.build_payload(payload, cls.handle_voiceurl_message(voice_url), False)
elif seg.type == "music": elif seg.type == "music":
song_id = seg.data music_data = seg.data
new_payload = cls.build_payload(payload, cls.handle_music_message(song_id), False) new_payload = cls.build_payload(payload, cls.handle_music_message(music_data), False)
elif seg.type == "videourl": elif seg.type == "videourl":
video_url = seg.data video_url = seg.data
new_payload = cls.build_payload(payload, cls.handle_videourl_message(video_url), False) new_payload = cls.build_payload(payload, cls.handle_videourl_message(video_url), False)
@ -170,13 +170,43 @@ class SendMessageHandleClass:
} }
@staticmethod @staticmethod
def handle_music_message(song_id: str) -> dict: def handle_music_message(music_data) -> dict:
"""处理音乐消息""" """
处理音乐消息
music_data 可以是
1. 字符串默认为网易云音乐ID
2. 字典{"type": "163"/"qq", "id": "歌曲ID"}
"""
# 兼容旧格式直接传入歌曲ID字符串
if isinstance(music_data, str):
return { return {
"type": "music", "type": "music",
"data": {"type": "163", "id": song_id}, "data": {"type": "163", "id": music_data},
} }
# 新格式字典包含平台和ID
if isinstance(music_data, dict):
platform = music_data.get("type", "163") # 默认网易云
song_id = music_data.get("id", "")
# 验证平台类型
if platform not in ["163", "qq"]:
logger.warning(f"不支持的音乐平台: {platform}使用默认平台163")
platform = "163"
# 确保ID是字符串
if not isinstance(song_id, str):
song_id = str(song_id)
return {
"type": "music",
"data": {"type": platform, "id": song_id},
}
# 其他情况返回空
logger.error(f"不支持的音乐数据格式: {type(music_data)}")
return {}
@staticmethod @staticmethod
def handle_videourl_message(video_url: str) -> dict: def handle_videourl_message(video_url: str) -> dict:
"""处理视频链接消息""" """处理视频链接消息"""
@ -186,13 +216,62 @@ class SendMessageHandleClass:
} }
@staticmethod @staticmethod
def handle_file_message(file_path: str) -> dict: def handle_file_message(file_data) -> dict:
"""处理文件消息""" """处理文件消息
Args:
file_data: 可以是字符串文件路径或字典完整文件信息
- 字符串简单的文件路径
- 字典包含 file, name, path, thumb, url 等字段
Returns:
NapCat 格式的文件消息段
"""
# 如果是简单的字符串路径(兼容旧版本)
if isinstance(file_data, str):
return { return {
"type": "file", "type": "file",
"data": {"file": f"file://{file_path}"}, "data": {"file": f"file://{file_data}"},
} }
# 如果是完整的字典数据
if isinstance(file_data, dict):
data = {}
# file 字段是必需的
if "file" in file_data:
file_value = file_data["file"]
# 如果是本地路径且没有协议前缀,添加 file:// 前缀
if not any(file_value.startswith(prefix) for prefix in ["file://", "http://", "https://", "base64://"]):
data["file"] = f"file://{file_value}"
else:
data["file"] = file_value
else:
# 没有 file 字段,尝试使用 path 或 url
if "path" in file_data:
data["file"] = f"file://{file_data['path']}"
elif "url" in file_data:
data["file"] = file_data["url"]
else:
logger.warning("文件消息缺少必要的 file/path/url 字段")
return None
# 添加可选字段
if "name" in file_data:
data["name"] = file_data["name"]
if "thumb" in file_data:
data["thumb"] = file_data["thumb"]
if "url" in file_data and "file" not in file_data:
data["file"] = file_data["url"]
return {
"type": "file",
"data": data,
}
logger.warning(f"不支持的文件数据类型: {type(file_data)}")
return None
@staticmethod @staticmethod
def handle_imageurl_message(image_url: str) -> dict: def handle_imageurl_message(image_url: str) -> dict:
"""处理图片链接消息""" """处理图片链接消息"""

View File

@ -1,5 +1,5 @@
[inner] [inner]
version = "0.1.2" # 版本号 version = "0.1.3" # 版本号
# 请勿修改版本号,除非你知道自己在做什么 # 请勿修改版本号,除非你知道自己在做什么
[nickname] # 现在没用 [nickname] # 现在没用
@ -15,8 +15,8 @@ heartbeat_interval = 30 # 与Napcat设置的心跳相同按秒计
host = "localhost" # 麦麦在.env文件中设置的主机地址即HOST字段 host = "localhost" # 麦麦在.env文件中设置的主机地址即HOST字段
port = 8000 # 麦麦在.env文件中设置的端口即PORT字段 port = 8000 # 麦麦在.env文件中设置的端口即PORT字段
enable_api_server = false # 是否启用API-Server模式连接 enable_api_server = false # 是否启用API-Server模式连接
base_url = "" # API-Server连接地址 (ws://ip:port/path)仅在enable_api_server为true时使用 base_url = "ws://127.0.0.1:18095/ws" # API-Server连接地址 (ws://ip:port/path)仅在enable_api_server为true时使用
api_key = "" # API Key (仅在enable_api_server为true时使用) api_key = "maibot" # API Key (仅在enable_api_server为true时使用)
[chat] # 黑白名单功能 [chat] # 黑白名单功能
group_list_type = "whitelist" # 群组名单类型可选为whitelist, blacklist group_list_type = "whitelist" # 群组名单类型可选为whitelist, blacklist
@ -34,5 +34,8 @@ enable_poke = true # 是否启用戳一戳功能
[voice] # 发送语音设置 [voice] # 发送语音设置
use_tts = false # 是否使用tts语音请确保你配置了tts并有对应的adapter use_tts = false # 是否使用tts语音请确保你配置了tts并有对应的adapter
[forward] # 转发消息处理设置
image_threshold = 3 # 图片数量阈值:转发消息中图片数量超过此值时使用占位符(避免麦麦VLM处理卡死)
[debug] [debug]
level = "INFO" # 日志等级DEBUG, INFO, WARNING, ERROR, CRITICAL level = "INFO" # 日志等级DEBUG, INFO, WARNING, ERROR, CRITICAL