feat: 添加配置管理器支持热重载功能

- 实现 ConfigManager 类,支持加载和热重载配置文件
- 使用 watchdog 监控配置文件变化,自动重载配置
- 支持为特定配置项注册回调函数,便于处理配置变更
- 提供多种配置属性访问接口,如 nickname、chat、voice 等
- 增加防抖机制,避免频繁重载导致的性能问题
pull/76/head
墨梓柒 2026-01-03 02:00:58 +08:00
parent 3e27e57409
commit b0bfa1a42d
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
9 changed files with 1581 additions and 66 deletions

120
main.py
View File

@ -14,20 +14,26 @@ from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
from src.response_pool import put_response, check_timeout_response from src.response_pool import put_response, check_timeout_response
message_queue = asyncio.Queue() message_queue = asyncio.Queue()
websocket_server = None # 保存WebSocket服务器实例以便关闭
async def message_recv(server_connection: Server.ServerConnection): async def message_recv(server_connection: Server.ServerConnection):
await message_handler.set_server_connection(server_connection) try:
asyncio.create_task(notice_handler.set_server_connection(server_connection)) await message_handler.set_server_connection(server_connection)
await nc_message_sender.set_server_connection(server_connection) asyncio.create_task(notice_handler.set_server_connection(server_connection))
async for raw_message in server_connection: await nc_message_sender.set_server_connection(server_connection)
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message) async for raw_message in server_connection:
decoded_raw_message: dict = json.loads(raw_message) logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
post_type = decoded_raw_message.get("post_type") decoded_raw_message: dict = json.loads(raw_message)
if post_type in ["meta_event", "message", "notice"]: post_type = decoded_raw_message.get("post_type")
await message_queue.put(decoded_raw_message) if post_type in ["meta_event", "message", "notice"]:
elif post_type is None: await message_queue.put(decoded_raw_message)
await put_response(decoded_raw_message) elif post_type is None:
await put_response(decoded_raw_message)
except asyncio.CancelledError:
logger.debug("message_recv 收到取消信号,正在关闭连接")
await server_connection.close()
raise
async def message_process(): async def message_process():
@ -47,8 +53,72 @@ async def message_process():
async def main(): async def main():
# 启动配置文件监控并注册napcat_server配置变更回调
from src.config import config_manager
# 保存napcat_server任务的引用用于重启
napcat_task = None
restart_event = asyncio.Event()
async def on_napcat_config_change(old_value, new_value):
"""当napcat_server配置变更时重启WebSocket服务器"""
nonlocal napcat_task
logger.warning(
f"NapCat配置已变更:\n"
f" 旧配置: {old_value.host}:{old_value.port}\n"
f" 新配置: {new_value.host}:{new_value.port}"
)
# 关闭当前WebSocket服务器
global websocket_server
if websocket_server:
try:
logger.info("正在关闭旧的WebSocket服务器...")
websocket_server.close()
await websocket_server.wait_closed()
logger.info("旧的WebSocket服务器已关闭")
except Exception as e:
logger.error(f"关闭旧WebSocket服务器失败: {e}")
# 取消旧任务
if napcat_task and not napcat_task.done():
napcat_task.cancel()
try:
await napcat_task
except asyncio.CancelledError:
pass
# 触发重启
restart_event.set()
config_manager.on_config_change("napcat_server", on_napcat_config_change)
# 启动文件监控
asyncio.create_task(config_manager.start_watch())
# WebSocket服务器重启循环
async def napcat_with_restart():
nonlocal napcat_task
while True:
restart_event.clear()
try:
await napcat_server()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"NapCat服务器异常: {e}")
break
# 等待重启信号
if not restart_event.is_set():
break
logger.info("正在重启WebSocket服务器...")
await asyncio.sleep(1) # 等待1秒后重启
message_send_instance.maibot_router = router message_send_instance.maibot_router = router
_ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process(), check_timeout_response()) _ = await asyncio.gather(napcat_with_restart(), mmc_start_com(), message_process(), check_timeout_response())
def check_napcat_server_token(conn, request): def check_napcat_server_token(conn, request):
token = global_config.napcat_server.token token = global_config.napcat_server.token
@ -64,6 +134,7 @@ def check_napcat_server_token(conn, request):
return None return None
async def napcat_server(): async def napcat_server():
global websocket_server
logger.info("正在启动 MaiBot-Napcat-Adapter...") logger.info("正在启动 MaiBot-Napcat-Adapter...")
logger.debug(f"日志等级: {global_config.debug.level}") logger.debug(f"日志等级: {global_config.debug.level}")
logger.debug("日志文件: logs/adapter_*.log") logger.debug("日志文件: logs/adapter_*.log")
@ -75,10 +146,15 @@ async def napcat_server():
max_size=2**26, max_size=2**26,
process_request=check_napcat_server_token process_request=check_napcat_server_token
) as server: ) as server:
websocket_server = server
logger.success( logger.success(
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}" f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
) )
await server.serve_forever() try:
await server.serve_forever()
except asyncio.CancelledError:
logger.debug("napcat_server 收到取消信号")
raise
except OSError: except OSError:
# 端口绑定失败时抛出异常让外层处理 # 端口绑定失败时抛出异常让外层处理
raise raise
@ -90,13 +166,24 @@ async def graceful_shutdown(silent: bool = False):
Args: Args:
silent: 静默模式,控制台不输出日志,但仍记录到文件 silent: 静默模式,控制台不输出日志,但仍记录到文件
""" """
global websocket_server
try: try:
if not silent: if not silent:
logger.info("正在关闭adapter...") logger.info("正在关闭adapter...")
else: else:
logger.debug("正在清理资源...") logger.debug("正在清理资源...")
# 先关闭MMC连接 # 先关闭WebSocket服务器
if websocket_server:
try:
logger.debug("正在关闭WebSocket服务器")
websocket_server.close()
await websocket_server.wait_closed()
logger.debug("WebSocket服务器已关闭")
except Exception as e:
logger.debug(f"关闭WebSocket服务器时出现错误: {e}")
# 关闭MMC连接
try: try:
await asyncio.wait_for(mmc_stop_com(), timeout=3) await asyncio.wait_for(mmc_stop_com(), timeout=3)
except asyncio.TimeoutError: except asyncio.TimeoutError:
@ -151,10 +238,11 @@ if __name__ == "__main__":
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行") logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
logger.error(" 2. 修改 config.toml 中的 port 配置") logger.error(" 2. 修改 config.toml 中的 port 配置")
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}") logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
logger.debug("完整错误信息:", exc_info=True)
else: else:
logger.error(f"❌ 网络错误: {str(e)}") logger.error(f"❌ 网络错误: {str(e)}")
logger.debug("完整错误信息:", exc_info=True)
logger.debug("完整错误信息:", exc_info=True)
# 端口占用时静默清理(控制台不输出,但记录到日志文件) # 端口占用时静默清理(控制台不输出,但记录到日志文件)
try: try:
loop.run_until_complete(graceful_shutdown(silent=True)) loop.run_until_complete(graceful_shutdown(silent=True))

View File

@ -13,6 +13,7 @@ dependencies = [
"sqlmodel>=0.0.27", "sqlmodel>=0.0.27",
"tomlkit>=0.13.3", "tomlkit>=0.13.3",
"websockets>=15.0.1", "websockets>=15.0.1",
"watchdog>=3.0.0",
] ]
[tool.ruff] [tool.ruff]

View File

@ -8,3 +8,4 @@ pillow
tomlkit tomlkit
rich rich
sqlmodel sqlmodel
watchdog

View File

@ -1,5 +1,6 @@
from .config import global_config from .config import global_config, _config_manager as config_manager
__all__ = [ __all__ = [
"global_config", "global_config",
"config_manager",
] ]

View File

@ -142,5 +142,15 @@ def load_config(config_path: str) -> Config:
update_config() update_config()
logger.info("正在品鉴配置文件...") logger.info("正在品鉴配置文件...")
global_config = load_config(config_path="config.toml")
# 创建配置管理器
from .config_manager import ConfigManager
_config_manager = ConfigManager()
_config_manager.load(config_path="config.toml")
# 向后兼容global_config 指向配置管理器
# 所有现有代码可以继续使用 global_config.chat.xxx 访问配置
global_config = _config_manager
logger.info("非常的新鲜,非常的美味!") logger.info("非常的新鲜,非常的美味!")

View File

@ -0,0 +1,276 @@
"""配置管理器 - 支持热重载"""
import asyncio
import os
from typing import Callable, Dict, List, Any, Optional
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from ..logger import logger
from .config import Config, load_config
from .official_configs import (
ChatConfig,
DebugConfig,
MaiBotServerConfig,
NapcatServerConfig,
NicknameConfig,
VoiceConfig,
)
class ConfigManager:
"""配置管理器 - 混合模式(属性代理 + 选择性回调)
支持热重载配置文件使用watchdog实时监控文件变化
需要特殊处理的配置项可以注册回调函数
"""
def __init__(self) -> None:
self._config: Optional[Config] = None
self._config_path: str = "config.toml"
self._lock: asyncio.Lock = asyncio.Lock()
self._callbacks: Dict[str, List[Callable]] = {}
# Watchdog相关
self._observer: Optional[Observer] = None
self._event_handler: Optional[FileSystemEventHandler] = None
self._reload_debounce_task: Optional[asyncio.Task] = None
self._debounce_delay: float = 0.5 # 防抖延迟(秒)
self._loop: Optional[asyncio.AbstractEventLoop] = None # 事件循环引用
def load(self, config_path: str = "config.toml") -> None:
"""加载配置文件
Args:
config_path: 配置文件路径
"""
self._config_path = os.path.abspath(config_path)
self._config = load_config(config_path)
logger.info(f"配置已加载: {config_path}")
async def reload(self, config_path: Optional[str] = None) -> bool:
"""重载配置文件(热重载)
Args:
config_path: 配置文件路径如果为None则使用初始路径
Returns:
bool: 是否重载成功
"""
if config_path is None:
config_path = self._config_path
async with self._lock:
old_config = self._config
try:
new_config = load_config(config_path)
if old_config is not None:
await self._notify_changes(old_config, new_config)
self._config = new_config
logger.info(f"配置重载成功: {config_path}")
return True
except Exception as e:
logger.error(f"配置重载失败: {e}", exc_info=True)
return False
def on_config_change(
self,
config_path: str,
callback: Callable[[Any, Any], Any]
) -> None:
"""为特定配置路径注册回调函数
Args:
config_path: 配置路径 'napcat_server', 'chat.ban_user_id', 'debug.level'
callback: 回调函数签名为 async def callback(old_value, new_value)
"""
if config_path not in self._callbacks:
self._callbacks[config_path] = []
self._callbacks[config_path].append(callback)
logger.debug(f"已注册配置变更回调: {config_path}")
async def _notify_changes(self, old_config: Config, new_config: Config) -> None:
"""通知配置变更
Args:
old_config: 旧配置对象
new_config: 新配置对象
"""
for config_path, callbacks in self._callbacks.items():
try:
old_value = self._get_value(old_config, config_path)
new_value = self._get_value(new_config, config_path)
if old_value != new_value:
logger.info(f"检测到配置变更: {config_path}")
for callback in callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(old_value, new_value)
else:
callback(old_value, new_value)
except Exception as e:
logger.error(
f"配置变更回调执行失败 [{config_path}]: {e}",
exc_info=True
)
except Exception as e:
logger.error(f"获取配置值失败 [{config_path}]: {e}")
def _get_value(self, config: Config, path: str) -> Any:
"""获取嵌套配置值
Args:
config: 配置对象
path: 配置路径支持点分隔的嵌套路径
Returns:
Any: 配置值
Raises:
AttributeError: 配置路径不存在
"""
parts = path.split('.')
value = config
for part in parts:
value = getattr(value, part)
return value
@property
def nickname(self) -> NicknameConfig:
"""昵称配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.nickname
@property
def chat(self) -> ChatConfig:
"""聊天配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.chat
@property
def voice(self) -> VoiceConfig:
"""语音配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.voice
@property
def napcat_server(self) -> NapcatServerConfig:
"""NapCat服务器配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.napcat_server
@property
def maibot_server(self) -> MaiBotServerConfig:
"""MaiBot服务器配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.maibot_server
@property
def debug(self) -> DebugConfig:
"""调试配置"""
if self._config is None:
raise RuntimeError("配置尚未加载,请先调用 load() 方法")
return self._config.debug
async def start_watch(self) -> None:
"""启动配置文件监控(需要在事件循环中调用)"""
if self._observer is not None:
logger.warning("配置文件监控已在运行")
return
# 保存当前事件循环引用
self._loop = asyncio.get_running_loop()
# 创建文件监控事件处理器
config_file_path = self._config_path
class ConfigFileHandler(FileSystemEventHandler):
def __init__(handler_self, manager: "ConfigManager"):
handler_self.manager = manager
handler_self.config_path = config_file_path
def on_modified(handler_self, event):
# 检查是否是目标配置文件修改事件
if isinstance(event, FileModifiedEvent) and os.path.abspath(event.src_path) == handler_self.config_path:
logger.debug(f"检测到配置文件变更: {event.src_path}")
# 使用防抖机制避免重复重载
# watchdog运行在独立线程需要使用run_coroutine_threadsafe
if handler_self.manager._loop:
asyncio.run_coroutine_threadsafe(
handler_self.manager._debounced_reload(),
handler_self.manager._loop
)
self._event_handler = ConfigFileHandler(self)
# 创建Observer并监控配置文件所在目录
self._observer = Observer()
watch_dir = os.path.dirname(self._config_path) or "."
self._observer.schedule(self._event_handler, watch_dir, recursive=False)
self._observer.start()
logger.info(f"已启动配置文件实时监控: {self._config_path}")
async def stop_watch(self) -> None:
"""停止配置文件监控"""
if self._observer is None:
return
logger.debug("正在停止配置文件监控")
# 取消防抖任务
if self._reload_debounce_task:
self._reload_debounce_task.cancel()
try:
await self._reload_debounce_task
except asyncio.CancelledError:
pass
# 停止observer
self._observer.stop()
self._observer.join(timeout=2)
self._observer = None
self._event_handler = None
logger.info("配置文件监控已停止")
async def _debounced_reload(self) -> None:
"""防抖重载:避免短时间内多次文件修改事件导致重复重载"""
# 取消之前的防抖任务
if self._reload_debounce_task and not self._reload_debounce_task.done():
self._reload_debounce_task.cancel()
# 等待防抖延迟
await asyncio.sleep(self._debounce_delay)
# 执行重载
modified_time = datetime.fromtimestamp(
os.path.getmtime(self._config_path)
).strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"配置文件已更新 (修改时间: {modified_time}),正在重载..."
)
success = await self.reload()
if not success:
logger.error(
"配置文件重载失败!请检查配置文件格式是否正确。\n"
"当前仍使用旧配置运行,修复配置文件后将自动重试。"
)
def __repr__(self) -> str:
watching = self._observer is not None and self._observer.is_alive()
return f"<ConfigManager config_path={self._config_path} watching={watching}>"

View File

@ -537,7 +537,7 @@ class MessageHandler:
return Seg(type="text", data=announce_text) return Seg(type="text", data=announce_text)
# 检查是否为音乐卡片 # 检查是否为音乐卡片
if app == "com.tencent.music.lua" or app == "com.tencent.structmsg": if app in ("com.tencent.music.lua", "com.tencent.structmsg"):
meta = parsed_json.get("meta", {}) meta = parsed_json.get("meta", {})
music = meta.get("music", {}) music = meta.get("music", {})

View File

@ -1,44 +1,83 @@
from maim_message import GroupInfo from maim_message import GroupInfo
from typing import Any, Dict, Tuple from typing import Any, Dict, Tuple, Callable, Optional
from src import CommandType from src import CommandType
# 全局命令处理器注册表(在类外部定义以避免循环引用)
_command_handlers: Dict[str, Dict[str, Any]] = {}
def register_command(command_type: CommandType, require_group: bool = True):
"""装饰器:注册命令处理器
Args:
command_type: 命令类型
require_group: 是否需要群聊信息默认为True
Returns:
装饰器函数
"""
def decorator(func: Callable) -> Callable:
_command_handlers[command_type.name] = {
"handler": func,
"require_group": require_group,
}
return func
return decorator
class SendCommandHandleClass: class SendCommandHandleClass:
@classmethod @classmethod
def handle_command(cls, raw_command_data: Dict[str, Any], group_info: GroupInfo): def handle_command(cls, raw_command_data: Dict[str, Any], group_info: Optional[GroupInfo]):
"""统一命令处理入口
Args:
raw_command_data: 原始命令数据
group_info: 群聊信息可选
Returns:
Tuple[str, Dict[str, Any]]: (action, params) 用于发送给NapCat
Raises:
RuntimeError: 命令类型未知或处理失败
"""
command_name: str = raw_command_data.get("name") command_name: str = raw_command_data.get("name")
if command_name not in _command_handlers:
raise RuntimeError(f"未知的命令类型: {command_name}")
try: try:
match command_name: handler_info = _command_handlers[command_name]
case CommandType.GROUP_BAN.name: handler = handler_info["handler"]
return cls.handle_ban_command(raw_command_data.get("args", {}), group_info) require_group = handler_info["require_group"]
case CommandType.GROUP_WHOLE_BAN.name:
return cls.handle_whole_ban_command(raw_command_data.get("args", {}), group_info) # 检查群聊信息要求
case CommandType.GROUP_KICK.name: if require_group and not group_info:
return cls.handle_kick_command(raw_command_data.get("args", {}), group_info) raise ValueError(f"命令 {command_name} 需要在群聊上下文中使用")
case CommandType.SEND_POKE.name:
return cls.handle_poke_command(raw_command_data.get("args", {}), group_info) # 调用处理器
case CommandType.DELETE_MSG.name: args = raw_command_data.get("args", {})
return cls.delete_msg_command(raw_command_data.get("args", {})) return handler(args, group_info)
case CommandType.AI_VOICE_SEND.name:
return cls.handle_ai_voice_send_command(raw_command_data.get("args", {}), group_info)
case CommandType.MESSAGE_LIKE.name:
return cls.handle_message_like_command(raw_command_data.get("args", {}))
case _:
raise RuntimeError(f"未知的命令类型: {command_name}")
except Exception as e: except Exception as e:
raise RuntimeError(f"处理命令时出错: {str(e)}") from e raise RuntimeError(f"处理命令 {command_name} 时出错: {str(e)}") from e
# ============ 命令处理器(使用装饰器注册)============
@staticmethod @staticmethod
def handle_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_BAN, require_group=True)
def handle_ban_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理封禁命令 """处理封禁命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"qq_id": int, "duration": int}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息对应目标群聊
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
duration: int = int(args["duration"]) duration: int = int(args["duration"])
user_id: int = int(args["qq_id"]) user_id: int = int(args["qq_id"])
@ -59,15 +98,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_whole_ban_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_WHOLE_BAN, require_group=True)
def handle_whole_ban_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理全体禁言命令 """处理全体禁言命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"enable": bool}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息对应目标群聊
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
enable = args["enable"] enable = args["enable"]
assert isinstance(enable, bool), "enable参数必须是布尔值" assert isinstance(enable, bool), "enable参数必须是布尔值"
@ -83,15 +123,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_kick_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.GROUP_KICK, require_group=True)
def handle_kick_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理群成员踢出命令 """处理群成员踢出命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"qq_id": int}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息对应目标群聊
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
user_id: int = int(args["qq_id"]) user_id: int = int(args["qq_id"])
group_id: int = int(group_info.group_id) group_id: int = int(group_info.group_id)
@ -109,15 +150,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_poke_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.SEND_POKE, require_group=False)
def handle_poke_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理戳一戳命令 """处理戳一戳命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"qq_id": int}
group_info (GroupInfo): 群聊信息对应目标群聊 group_info: 群聊信息可选私聊时为None
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
user_id: int = int(args["qq_id"]) user_id: int = int(args["qq_id"])
if group_info is None: if group_info is None:
@ -137,14 +179,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def delete_msg_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.DELETE_MSG, require_group=False)
def delete_msg_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
"""处理撤回消息命令 """处理撤回消息命令
Args: Args:
args (Dict[str, Any]): 参数字典 args: 参数字典 {"message_id": int}
group_info: 群聊信息不使用
Returns: Returns:
Tuple[CommandType, Dict[str, Any]] Tuple[str, Dict[str, Any]]: (action, params)
""" """
try: try:
message_id = int(args["message_id"]) message_id = int(args["message_id"])
@ -163,10 +207,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_ai_voice_send_command(args: Dict[str, Any], group_info: GroupInfo) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.AI_VOICE_SEND, require_group=True)
""" def handle_ai_voice_send_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
处理AI语音发送命令的逻辑 """处理AI语音发送命令
并返回 NapCat 兼容的 (action, params) 元组
Args:
args: 参数字典 {"character": str, "text": str}
group_info: 群聊信息
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
""" """
if not group_info or not group_info.group_id: if not group_info or not group_info.group_id:
raise ValueError("AI语音发送命令必须在群聊上下文中使用") raise ValueError("AI语音发送命令必须在群聊上下文中使用")
@ -190,9 +240,16 @@ class SendCommandHandleClass:
) )
@staticmethod @staticmethod
def handle_message_like_command(args: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: @register_command(CommandType.MESSAGE_LIKE, require_group=False)
""" def handle_message_like_command(args: Dict[str, Any], group_info: Optional[GroupInfo]) -> Tuple[str, Dict[str, Any]]:
处理给消息贴表情的逻辑 """处理给消息贴表情命令
Args:
args: 参数字典 {"message_id": int, "emoji_id": int}
group_info: 群聊信息不使用
Returns:
Tuple[str, Dict[str, Any]]: (action, params)
""" """
if not args: if not args:
raise ValueError("消息贴表情命令缺少参数") raise ValueError("消息贴表情命令缺少参数")

1081
uv.lock

File diff suppressed because it is too large Load Diff