增加对群消息表情回应、文件上传、群成员增减及管理员变动的处理,优化日志记录,清理过期日志,更新适配器启动信息

pull/69/head
墨梓柒 2025-12-11 15:10:21 +08:00
parent 0d7733734c
commit 96b6487ccc
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
7 changed files with 939 additions and 34 deletions

1
.gitignore vendored
View File

@ -39,6 +39,7 @@ share/python-wheels/
.installed.cfg .installed.cfg
*.egg *.egg
MANIFEST MANIFEST
dev/
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template

115
main.py
View File

@ -64,26 +64,73 @@ def check_napcat_server_token(conn, request):
return None return None
async def napcat_server(): async def napcat_server():
logger.info("正在启动adapter...") logger.info("正在启动 MaiBot-Napcat-Adapter...")
async with Server.serve(message_recv, global_config.napcat_server.host, global_config.napcat_server.port, max_size=2**26, process_request=check_napcat_server_token) as server: logger.debug(f"日志等级: {global_config.debug.level}")
logger.info( logger.debug("日志文件: logs/adapter_*.log")
f"Adapter已启动监听地址: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}" try:
async with Server.serve(
message_recv,
global_config.napcat_server.host,
global_config.napcat_server.port,
max_size=2**26,
process_request=check_napcat_server_token
) as server:
logger.success(
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
) )
await server.serve_forever() await server.serve_forever()
except OSError:
# 端口绑定失败时抛出异常让外层处理
raise
async def graceful_shutdown(): async def graceful_shutdown(silent: bool = False):
"""
优雅关闭adapter
Args:
silent: 静默模式,控制台不输出日志,但仍记录到文件
"""
try: try:
if not silent:
logger.info("正在关闭adapter...") logger.info("正在关闭adapter...")
else:
logger.debug("正在清理资源...")
# 先关闭MMC连接
try:
await asyncio.wait_for(mmc_stop_com(), timeout=3)
except asyncio.TimeoutError:
logger.debug("关闭MMC连接超时")
except Exception as e:
logger.debug(f"关闭MMC连接时出现错误: {e}")
# 取消所有任务
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if tasks:
logger.debug(f"正在取消 {len(tasks)} 个任务")
for task in tasks: for task in tasks:
if not task.done(): if not task.done():
task.cancel() task.cancel()
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), 15)
await mmc_stop_com() # 后置避免神秘exception # 等待任务完成,记录异常到日志文件
logger.info("Adapter已成功关闭") if tasks:
try:
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
# 记录任务取消的详细信息到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.debug(f"任务 {i+1} 清理时产生异常: {type(result).__name__}: {result}")
except asyncio.TimeoutError:
logger.debug("任务清理超时")
except Exception as e: except Exception as e:
logger.error(f"Adapter关闭中出现错误: {e}") logger.debug(f"任务清理时出现错误: {e}")
if not silent:
logger.info("Adapter已成功关闭")
else:
logger.debug("资源清理完成")
except Exception as e:
logger.debug(f"graceful_shutdown异常: {e}", exc_info=True)
if __name__ == "__main__": if __name__ == "__main__":
@ -93,11 +140,57 @@ if __name__ == "__main__":
loop.run_until_complete(main()) loop.run_until_complete(main())
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...") logger.warning("收到中断信号,正在优雅关闭...")
loop.run_until_complete(graceful_shutdown()) try:
loop.run_until_complete(graceful_shutdown(silent=False))
except Exception:
pass
except OSError as e:
# 处理端口占用等网络错误
if e.errno == 10048 or "address already in use" in str(e).lower():
logger.error(f"❌ 端口 {global_config.napcat_server.port} 已被占用,请检查:")
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
logger.error(" 2. 修改 config.toml 中的 port 配置")
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
logger.debug("完整错误信息:", exc_info=True)
else:
logger.error(f"❌ 网络错误: {str(e)}")
logger.debug("完整错误信息:", exc_info=True)
# 端口占用时静默清理(控制台不输出,但记录到日志文件)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e: except Exception as e:
logger.exception(f"主程序异常: {str(e)}") logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1)
except Exception as e:
logger.error(f"❌ 主程序异常: {str(e)}")
logger.debug("详细错误信息:", exc_info=True)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e:
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1) sys.exit(1)
finally:
# 清理事件循环
try:
# 取消所有剩余任务
pending = asyncio.all_tasks(loop)
if pending:
logger.debug(f"finally块清理 {len(pending)} 个剩余任务")
for task in pending:
task.cancel()
# 给任务一点时间完成取消
try:
results = loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
# 记录清理结果到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.debug(f"剩余任务 {i+1} 清理异常: {type(result).__name__}: {result}")
except Exception as e:
logger.debug(f"清理剩余任务时出现错误: {e}")
except Exception as e:
logger.debug(f"finally块清理出现错误: {e}")
finally: finally:
if loop and not loop.is_closed(): if loop and not loop.is_closed():
logger.debug("关闭事件循环")
loop.close() loop.close()
sys.exit(0) sys.exit(0)

View File

@ -1,21 +1,106 @@
from loguru import logger from loguru import logger
from .config import global_config from .config import global_config
import sys import sys
from pathlib import Path
from datetime import datetime, timedelta
# 默认 logger # 日志目录配置
LOG_DIR = Path(__file__).parent.parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
# 日志等级映射(用于显示单字母)
LEVEL_ABBR = {
"TRACE": "T",
"DEBUG": "D",
"INFO": "I",
"SUCCESS": "S",
"WARNING": "W",
"ERROR": "E",
"CRITICAL": "C"
}
def get_level_abbr(record):
"""获取日志等级的缩写"""
return LEVEL_ABBR.get(record["level"].name, record["level"].name[0])
def clean_old_logs(days: int = 30):
"""清理超过指定天数的日志文件"""
try:
cutoff_date = datetime.now() - timedelta(days=days)
for log_file in LOG_DIR.glob("*.log"):
try:
file_time = datetime.fromtimestamp(log_file.stat().st_mtime)
if file_time < cutoff_date:
log_file.unlink()
print(f"已清理过期日志: {log_file.name}")
except Exception as e:
print(f"清理日志文件 {log_file.name} 失败: {e}")
except Exception as e:
print(f"清理日志目录失败: {e}")
# 清理过期日志
clean_old_logs(30)
# 移除默认处理器
logger.remove() logger.remove()
# 自定义格式化函数
def format_log(record):
"""格式化日志记录"""
record["extra"]["level_abbr"] = get_level_abbr(record)
if "module_name" not in record["extra"]:
record["extra"]["module_name"] = "Adapter"
return True
# 控制台输出处理器 - 简洁格式
logger.add( logger.add(
sys.stderr, sys.stderr,
level=global_config.debug.level, level=global_config.debug.level,
format="<blue>{time:YYYY-MM-DD HH:mm:ss}</blue> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<blue>{time:MM-DD HH:mm:ss}</blue> | <level>[{extra[level_abbr]}]</level> | <cyan>{extra[module_name]}</cyan> | <level>{message}</level>",
filter=lambda record: "name" not in record["extra"] or record["extra"].get("name") != "maim_message", filter=lambda record: format_log(record) and record["extra"].get("module_name") != "maim_message",
) )
# maim_message 单独处理
logger.add( logger.add(
sys.stderr, sys.stderr,
level="INFO", level="INFO",
format="<red>{time:YYYY-MM-DD HH:mm:ss}</red> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", format="<red>{time:MM-DD HH:mm:ss}</red> | <level>[{extra[level_abbr]}]</level> | <cyan>{extra[module_name]}</cyan> | <level>{message}</level>",
filter=lambda record: record["extra"].get("name") == "maim_message", filter=lambda record: format_log(record) and record["extra"].get("module_name") == "maim_message",
) )
# 创建样式不同的 logger
custom_logger = logger.bind(name="maim_message") # 文件输出处理器 - 详细格式,记录所有TRACE级别
logger = logger.bind(name="MaiBot-Napcat-Adapter") log_file = LOG_DIR / f"adapter_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logger.add(
log_file,
level="TRACE",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | [{level}] | {extra[module_name]} | {name}:{function}:{line} - {message}",
rotation="100 MB", # 单个日志文件最大100MB
retention="30 days", # 保留30天
encoding="utf-8",
enqueue=True, # 异步写入,避免阻塞
filter=format_log, # 确保extra字段存在
)
def get_logger(module_name: str = "Adapter"):
"""
获取自定义模块名的logger
Args:
module_name: 模块名称,用于日志输出中标识来源
Returns:
配置好的logger实例
Example:
>>> from src.logger import get_logger
>>> logger = get_logger("MyModule")
>>> logger.info("这是一条日志")
MM-DD HH:mm:ss | [I] | MyModule | 这是一条日志
"""
return logger.bind(module_name=module_name)
# 默认logger实例(用于向后兼容)
logger = logger.bind(module_name="Adapter")
# maim_message的logger
custom_logger = logger.bind(module_name="maim_message")

View File

@ -32,6 +32,12 @@ class NoticeType: # 通知事件
group_recall = "group_recall" # 群聊消息撤回 group_recall = "group_recall" # 群聊消息撤回
notify = "notify" notify = "notify"
group_ban = "group_ban" # 群禁言 group_ban = "group_ban" # 群禁言
group_msg_emoji_like = "group_msg_emoji_like" # 群消息表情回应
group_upload = "group_upload" # 群文件上传
group_increase = "group_increase" # 群成员增加
group_decrease = "group_decrease" # 群成员减少
group_admin = "group_admin" # 群管理员变动
essence = "essence" # 精华消息
class Notify: class Notify:
poke = "poke" # 戳一戳 poke = "poke" # 戳一戳
@ -40,6 +46,23 @@ class NoticeType: # 通知事件
ban = "ban" # 禁言 ban = "ban" # 禁言
lift_ban = "lift_ban" # 解除禁言 lift_ban = "lift_ban" # 解除禁言
class GroupIncrease:
approve = "approve" # 管理员同意入群
invite = "invite" # 被邀请入群
class GroupDecrease:
leave = "leave" # 主动退群
kick = "kick" # 被踢出群
kick_me = "kick_me" # 机器人被踢
class GroupAdmin:
set = "set" # 设置管理员
unset = "unset" # 取消管理员
class Essence:
add = "add" # 添加精华消息
delete = "delete" # 移除精华消息
class RealMessageType: # 实际消息分类 class RealMessageType: # 实际消息分类
text = "text" # 纯文本 text = "text" # 纯文本
@ -56,6 +79,8 @@ class RealMessageType: # 实际消息分类
reply = "reply" # 回复消息 reply = "reply" # 回复消息
forward = "forward" # 转发消息 forward = "forward" # 转发消息
node = "node" # 转发消息节点 node = "node" # 转发消息节点
json = "json" # JSON卡片消息
file = "file" # 文件消息
class MessageSentType: class MessageSentType:

View File

@ -300,7 +300,23 @@ class MessageHandler:
else: else:
logger.warning("record处理失败或不支持") logger.warning("record处理失败或不支持")
case RealMessageType.video: case RealMessageType.video:
logger.warning("不支持视频解析") ret_seg = await self.handle_video_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("video处理失败")
case RealMessageType.json:
ret_seg = await self.handle_json_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("json处理失败")
case RealMessageType.file:
ret_seg = await self.handle_file_message(sub_message)
if ret_seg:
seg_message.append(ret_seg)
else:
logger.warning("file处理失败")
case RealMessageType.at: case RealMessageType.at:
ret_seg = await self.handle_at_message( ret_seg = await self.handle_at_message(
sub_message, sub_message,
@ -445,6 +461,77 @@ class MessageHandler:
return None return None
return Seg(type="voice", data=audio_base64) return Seg(type="voice", data=audio_base64)
async def handle_video_message(self, raw_message: dict) -> Seg | None:
"""
处理视频消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
file: str = message_data.get("file")
url: str = message_data.get("url")
file_size: str = message_data.get("file_size", "未知大小")
if not file:
logger.warning("视频消息缺少文件信息")
return None
# 视频消息返回文本描述,包含文件名和大小
video_text = f"[视频: {file}, 大小: {file_size}字节]"
if url:
video_text += f"\n视频链接: {url}"
return Seg(type="text", data=video_text)
async def handle_json_message(self, raw_message: dict) -> Seg | None:
"""
处理JSON卡片消息(小程序分享等)
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
json_data: str = message_data.get("data")
if not json_data:
logger.warning("JSON消息缺少数据")
return None
try:
# 尝试解析JSON获取prompt提示信息
parsed_json = json.loads(json_data)
prompt = parsed_json.get("prompt", "[卡片消息]")
return Seg(type="text", data=prompt)
except json.JSONDecodeError:
logger.warning("JSON消息解析失败")
return Seg(type="text", data="[卡片消息]")
async def handle_file_message(self, raw_message: dict) -> Seg | None:
"""
处理文件消息
Parameters:
raw_message: dict: 原始消息
Returns:
seg_data: Seg: 处理后的消息段
"""
message_data: dict = raw_message.get("data")
file_name: str = message_data.get("file")
file_size: str = message_data.get("file_size", "未知大小")
file_url: str = message_data.get("url")
if not file_name:
logger.warning("文件消息缺少文件名")
return None
file_text = f"[文件: {file_name}, 大小: {file_size}字节]"
if file_url:
file_text += f"\n文件链接: {file_url}"
return Seg(type="text", data=file_text)
async def handle_reply_message(self, raw_message: dict, additional_config: dict) -> Tuple[List[Seg] | None, dict]: async def handle_reply_message(self, raw_message: dict, additional_config: dict) -> Tuple[List[Seg] | None, dict]:
# sourcery skip: move-assign-in-block, use-named-expression # sourcery skip: move-assign-in-block, use-named-expression
""" """
@ -489,18 +576,25 @@ class MessageHandler:
image_count: int image_count: int
if not handled_message: if not handled_message:
return None return None
# 添加转发消息的标题和结束标识
forward_header = Seg(type="text", data="========== 转发消息开始 ==========\n")
forward_footer = Seg(type="text", data="========== 转发消息结束 ==========")
if image_count < 5 and image_count > 0: if image_count < 5 and image_count > 0:
# 处理图片数量小于5的情况此时解析图片为base64 # 处理图片数量小于5的情况此时解析图片为base64
logger.trace("图片数量小于5开始解析图片为base64") logger.trace("图片数量小于5开始解析图片为base64")
return await self._recursive_parse_image_seg(handled_message, True) parsed_message = await self._recursive_parse_image_seg(handled_message, True)
return Seg(type="seglist", data=[forward_header, parsed_message, forward_footer])
elif image_count > 0: elif image_count > 0:
logger.trace("图片数量大于等于5开始解析图片为占位符") logger.trace("图片数量大于等于5开始解析图片为占位符")
# 处理图片数量大于等于5的情况此时解析图片为占位符 # 处理图片数量大于等于5的情况此时解析图片为占位符
return await self._recursive_parse_image_seg(handled_message, False) parsed_message = await self._recursive_parse_image_seg(handled_message, False)
return Seg(type="seglist", data=[forward_header, parsed_message, forward_footer])
else: else:
# 处理没有图片的情况,此时直接返回 # 处理没有图片的情况,此时直接返回
logger.trace("没有图片,直接返回") logger.trace("没有图片,直接返回")
return handled_message return Seg(type="seglist", data=[forward_header, handled_message, forward_footer])
async def _recursive_parse_image_seg(self, seg_data: Seg, to_image: bool) -> Seg: async def _recursive_parse_image_seg(self, seg_data: Seg, to_image: bool) -> Seg:
# sourcery skip: merge-else-if-into-elif # sourcery skip: merge-else-if-into-elif

View File

@ -87,12 +87,13 @@ class NoticeHandler:
match notice_type: match notice_type:
case NoticeType.friend_recall: case NoticeType.friend_recall:
logger.info("好友撤回一条消息") logger.info("好友撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}") handled_message, user_info = await self.handle_friend_recall_notify(raw_message)
logger.warning("暂时不支持撤回消息处理")
case NoticeType.group_recall: case NoticeType.group_recall:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("群内用户撤回一条消息") logger.info("群内用户撤回一条消息")
logger.info(f"撤回消息ID{raw_message.get('message_id')}, 撤回时间:{raw_message.get('time')}") handled_message, user_info = await self.handle_group_recall_notify(raw_message, group_id, user_id)
logger.warning("暂时不支持撤回消息处理") system_notice = True
case NoticeType.notify: case NoticeType.notify:
sub_type = raw_message.get("sub_type") sub_type = raw_message.get("sub_type")
match sub_type: match sub_type:
@ -123,6 +124,37 @@ class NoticeHandler:
system_notice = True system_notice = True
case _: case _:
logger.warning(f"不支持的group_ban类型: {notice_type}.{sub_type}") logger.warning(f"不支持的group_ban类型: {notice_type}.{sub_type}")
case NoticeType.group_msg_emoji_like:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("处理群消息表情回应")
handled_message, user_info = await self.handle_emoji_like_notify(raw_message, group_id, user_id)
case NoticeType.group_upload:
if not await message_handler.check_allow_to_chat(user_id, group_id, True, False):
return None
logger.info("处理群文件上传")
handled_message, user_info = await self.handle_group_upload_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_increase:
sub_type = raw_message.get("sub_type")
logger.info(f"处理群成员增加: {sub_type}")
handled_message, user_info = await self.handle_group_increase_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_decrease:
sub_type = raw_message.get("sub_type")
logger.info(f"处理群成员减少: {sub_type}")
handled_message, user_info = await self.handle_group_decrease_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.group_admin:
sub_type = raw_message.get("sub_type")
logger.info(f"处理群管理员变动: {sub_type}")
handled_message, user_info = await self.handle_group_admin_notify(raw_message, group_id, user_id)
system_notice = True
case NoticeType.essence:
sub_type = raw_message.get("sub_type")
logger.info(f"处理精华消息: {sub_type}")
handled_message, user_info = await self.handle_essence_notify(raw_message, group_id)
system_notice = True
case _: case _:
logger.warning(f"不支持的notice类型: {notice_type}") logger.warning(f"不支持的notice类型: {notice_type}")
return None return None
@ -240,6 +272,322 @@ class NoticeHandler:
) )
return seg_data, user_info return seg_data, user_info
async def handle_friend_recall_notify(self, raw_message: dict) -> Tuple[Seg | None, UserInfo | None]:
"""处理好友消息撤回"""
user_id = raw_message.get("user_id")
message_id = raw_message.get("message_id")
if not user_id:
logger.error("用户ID不能为空无法处理好友撤回通知")
return None, None
# 获取好友信息
user_qq_info: dict = await get_stranger_info(self.server_connection, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
else:
user_name = "QQ用户"
logger.warning("无法获取撤回消息好友的昵称")
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=None,
)
seg_data = Seg(
type="notify",
data={
"sub_type": "friend_recall",
"message_id": message_id,
},
)
return seg_data, user_info
async def handle_group_recall_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""处理群消息撤回"""
if not group_id:
logger.error("群ID不能为空无法处理群撤回通知")
return None, None
message_id = raw_message.get("message_id")
operator_id = raw_message.get("operator_id")
# 获取撤回操作者信息
operator_nickname: str = None
operator_cardname: str = None
member_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if member_info:
operator_nickname = member_info.get("nickname")
operator_cardname = member_info.get("card")
else:
logger.warning("无法获取撤回操作者的昵称")
operator_nickname = "QQ用户"
operator_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=operator_id,
user_nickname=operator_nickname,
user_cardname=operator_cardname,
)
# 获取被撤回消息发送者信息(如果不是自己撤回的话)
recalled_user_info: UserInfo | None = None
if user_id != operator_id:
user_member_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_member_info:
user_nickname = user_member_info.get("nickname")
user_cardname = user_member_info.get("card")
else:
user_nickname = "QQ用户"
user_cardname = None
logger.warning("无法获取被撤回消息发送者的昵称")
recalled_user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
)
seg_data = Seg(
type="notify",
data={
"sub_type": "group_recall",
"message_id": message_id,
"recalled_user_info": recalled_user_info.to_dict() if recalled_user_info else None,
},
)
return seg_data, operator_info
async def handle_emoji_like_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""处理群消息表情回应"""
if not group_id:
logger.error("群ID不能为空无法处理表情回应通知")
return None, None
# 获取用户信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
user_name = "QQ用户"
user_cardname = "QQ用户"
logger.warning("无法获取表情回应用户的昵称")
# 解析表情列表
likes = raw_message.get("likes", [])
message_id = raw_message.get("message_id")
# 构建表情文本
emoji_texts = []
# QQ 官方表情映射表 (EmojiType=1 为 QQ 系统表情EmojiType=2 为 Emoji Unicode)
emoji_map = {
# QQ 系统表情 (Type 1)
"4": "得意",
"5": "流泪",
"8": "",
"9": "大哭",
"10": "尴尬",
"12": "调皮",
"14": "微笑",
"16": "",
"21": "可爱",
"23": "傲慢",
"24": "饥饿",
"25": "",
"26": "惊恐",
"27": "流汗",
"28": "憨笑",
"29": "悠闲",
"30": "奋斗",
"32": "疑问",
"33": "",
"34": "",
"38": "敲打",
"39": "再见",
"41": "发抖",
"42": "爱情",
"43": "跳跳",
"49": "拥抱",
"53": "蛋糕",
"60": "咖啡",
"63": "玫瑰",
"66": "爱心",
"74": "太阳",
"75": "月亮",
"76": "",
"78": "握手",
"79": "胜利",
"85": "飞吻",
"89": "西瓜",
"96": "冷汗",
"97": "擦汗",
"98": "抠鼻",
"99": "鼓掌",
"100": "糗大了",
"101": "坏笑",
"102": "左哼哼",
"103": "右哼哼",
"104": "哈欠",
"106": "委屈",
"109": "左亲亲",
"111": "可怜",
"116": "示爱",
"118": "抱拳",
"120": "拳头",
"122": "爱你",
"123": "NO",
"124": "OK",
"125": "转圈",
"129": "挥手",
"144": "喝彩",
"147": "棒棒糖",
"171": "",
"173": "泪奔",
"174": "无奈",
"175": "卖萌",
"176": "小纠结",
"179": "doge",
"180": "惊喜",
"181": "骚扰",
"182": "笑哭",
"183": "我最美",
"201": "点赞",
"203": "托脸",
"212": "托腮",
"214": "啵啵",
"219": "蹭一蹭",
"222": "抱抱",
"227": "拍手",
"232": "佛系",
"240": "喷脸",
"243": "甩头",
"246": "加油抱抱",
"262": "脑阔疼",
"264": "捂脸",
"265": "辣眼睛",
"266": "哦哟",
"267": "头秃",
"268": "问号脸",
"269": "暗中观察",
"270": "emm",
"271": "吃瓜",
"272": "呵呵哒",
"273": "我酸了",
"277": "汪汪",
"278": "",
"281": "无眼笑",
"282": "敬礼",
"284": "面无表情",
"285": "摸鱼",
"287": "",
"289": "睁眼",
"290": "敲开心",
"293": "摸锦鲤",
"294": "期待",
"297": "拜谢",
"298": "元宝",
"299": "牛啊",
"305": "右亲亲",
"306": "牛气冲天",
"307": "喵喵",
"314": "仔细分析",
"315": "加油",
"318": "崇拜",
"319": "比心",
"320": "庆祝",
"322": "拒绝",
"324": "吃糖",
"326": "生气",
# Unicode Emoji (Type 2)
"9728": "",
"9749": "",
"9786": "",
"10024": "",
"10060": "",
"10068": "",
"127801": "🌹",
"127817": "🍉",
"127822": "🍎",
"127827": "🍓",
"127836": "🍜",
"127838": "🍞",
"127847": "🍧",
"127866": "🍺",
"127867": "🍻",
"127881": "🎉",
"128027": "🐛",
"128046": "🐮",
"128051": "🐳",
"128053": "🐵",
"128074": "👊",
"128076": "👌",
"128077": "👍",
"128079": "👏",
"128089": "👙",
"128102": "👦",
"128104": "👨",
"128147": "💓",
"128157": "💝",
"128164": "💤",
"128166": "💦",
"128168": "💨",
"128170": "💪",
"128235": "📫",
"128293": "🔥",
"128513": "😁",
"128514": "😂",
"128516": "😄",
"128522": "😊",
"128524": "😌",
"128527": "😏",
"128530": "😒",
"128531": "😓",
"128532": "😔",
"128536": "😘",
"128538": "😚",
"128540": "😜",
"128541": "😝",
"128557": "😭",
"128560": "😰",
"128563": "😳",
}
for like in likes:
emoji_id = like.get("emoji_id", "")
count = like.get("count", 1)
emoji = emoji_map.get(emoji_id, f"表情{emoji_id}")
if count > 1:
emoji_texts.append(f"{emoji}x{count}")
else:
emoji_texts.append(emoji)
emoji_str = "".join(emoji_texts) if emoji_texts else "未知表情"
display_name = user_cardname if user_cardname and user_cardname != "QQ用户" else user_name
# 构建消息文本
message_text = f"{display_name} 对消息(ID:{message_id})表达了 {emoji_str}"
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
seg_data = Seg(type="text", data=message_text)
return seg_data, user_info
async def handle_ban_notify(self, raw_message: dict, group_id: int) -> Tuple[Seg, UserInfo] | Tuple[None, None]: async def handle_ban_notify(self, raw_message: dict, group_id: int) -> Tuple[Seg, UserInfo] | Tuple[None, None]:
if not group_id: if not group_id:
logger.error("群ID不能为空无法处理禁言通知") logger.error("群ID不能为空无法处理禁言通知")
@ -512,5 +860,256 @@ class NoticeHandler:
await unsuccessful_notice_queue.put(to_be_send) await unsuccessful_notice_queue.put(to_be_send)
await asyncio.sleep(1) await asyncio.sleep(1)
async def handle_group_upload_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群文件上传通知
"""
file_info: dict = raw_message.get("file", {})
file_name = file_info.get("name", "未知文件")
file_size = file_info.get("size", 0)
file_id = file_info.get("id", "")
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取上传者信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 格式化文件大小
if file_size < 1024:
size_str = f"{file_size}B"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.2f}KB"
else:
size_str = f"{file_size / (1024 * 1024):.2f}MB"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_upload",
"file_name": file_name,
"file_size": size_str,
"file_id": file_id,
},
)
return notify_seg, user_info
async def handle_group_increase_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群成员增加通知
"""
sub_type = raw_message.get("sub_type")
operator_id = raw_message.get("operator_id")
# 获取新成员信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取新成员信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 获取操作者信息
operator_name = "未知"
if operator_id:
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("card") or operator_info.get("nickname", "未知")
if sub_type == NoticeType.GroupIncrease.invite:
action_text = f"{operator_name} 邀请"
elif sub_type == NoticeType.GroupIncrease.approve:
action_text = f"{operator_name} 同意"
else:
action_text = "加入"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_increase",
"action": action_text,
"increase_type": sub_type,
"operator_id": operator_id,
},
)
return notify_seg, user_info
async def handle_group_decrease_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群成员减少通知
"""
sub_type = raw_message.get("sub_type")
operator_id = raw_message.get("operator_id")
# 获取离开成员信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取离开成员信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
# 获取操作者信息
operator_name = "未知"
if operator_id and operator_id != 0:
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("card") or operator_info.get("nickname", "未知")
if sub_type == NoticeType.GroupDecrease.leave:
action_text = "主动退群"
elif sub_type == NoticeType.GroupDecrease.kick:
action_text = f"{operator_name} 踢出"
elif sub_type == NoticeType.GroupDecrease.kick_me:
action_text = "机器人被踢出"
else:
action_text = "离开群聊"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_decrease",
"action": action_text,
"decrease_type": sub_type,
"operator_id": operator_id,
},
)
return notify_seg, user_info
async def handle_group_admin_notify(
self, raw_message: dict, group_id: int, user_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理群管理员变动通知
"""
sub_type = raw_message.get("sub_type")
# 获取目标用户信息
user_qq_info: dict = await get_member_info(self.server_connection, group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname")
user_cardname = user_qq_info.get("card")
else:
logger.warning("无法获取目标用户信息")
user_name = "QQ用户"
user_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=user_id,
user_nickname=user_name,
user_cardname=user_cardname,
)
if sub_type == NoticeType.GroupAdmin.set:
action_text = "被设置为管理员"
elif sub_type == NoticeType.GroupAdmin.unset:
action_text = "被取消管理员"
else:
action_text = "管理员变动"
notify_seg = Seg(
type="notify",
data={
"sub_type": "group_admin",
"action": action_text,
"admin_type": sub_type,
},
)
return notify_seg, user_info
async def handle_essence_notify(
self, raw_message: dict, group_id: int
) -> Tuple[Seg | None, UserInfo | None]:
"""
处理精华消息通知
"""
sub_type = raw_message.get("sub_type")
sender_id = raw_message.get("sender_id")
operator_id = raw_message.get("operator_id")
message_id = raw_message.get("message_id")
# 获取操作者信息(设置精华的人)
operator_info: dict = await get_member_info(self.server_connection, group_id, operator_id)
if operator_info:
operator_name = operator_info.get("nickname")
operator_cardname = operator_info.get("card")
else:
logger.warning("无法获取操作者信息")
operator_name = "QQ用户"
operator_cardname = None
user_info = UserInfo(
platform=global_config.maibot_server.platform_name,
user_id=operator_id,
user_nickname=operator_name,
user_cardname=operator_cardname,
)
# 获取消息发送者信息
sender_name = "未知用户"
if sender_id:
sender_info: dict = await get_member_info(self.server_connection, group_id, sender_id)
if sender_info:
sender_name = sender_info.get("card") or sender_info.get("nickname", "未知用户")
if sub_type == NoticeType.Essence.add:
action_text = f"{sender_name} 的消息设为精华"
elif sub_type == NoticeType.Essence.delete:
action_text = f"移除了 {sender_name} 的精华消息"
else:
action_text = "精华消息变动"
notify_seg = Seg(
type="notify",
data={
"sub_type": "essence",
"action": action_text,
"essence_type": sub_type,
"sender_id": sender_id,
"message_id": message_id,
},
)
return notify_seg, user_info
notice_handler = NoticeHandler() notice_handler = NoticeHandler()

8
uv.lock 100644
View File

@ -0,0 +1,8 @@
version = 1
revision = 2
requires-python = ">=3.13"
[[package]]
name = "maibotnapcatadapter"
version = "0.5.5"
source = { virtual = "." }