From c69ec0613063c908e1521d1f5a69665033428fb7 Mon Sep 17 00:00:00 2001 From: tcmofashi Date: Sun, 14 Dec 2025 12:36:25 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81maim=5Fmessage=20api-?= =?UTF-8?q?server=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message_receive/uni_message_sender.py | 59 +++++++- src/common/message/api.py | 140 ++++++++++++++++-- src/config/official_configs.py | 44 +++--- template/bot_config_template.toml | 19 +-- 4 files changed, 212 insertions(+), 50 deletions(-) diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index a45ac515..43c3746d 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -182,11 +182,60 @@ async def _send_message(message: MessageSending, show_log=True) -> bool: logger.info(f"已将消息 '{message_preview}' 发往 WebUI 聊天室") return True - # 直接调用API发送消息 - await get_global_api().send_message(message) - if show_log: - logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'") - return True + # 尝试通过Legacy API发送消息,如果失败尝试API Server Fallback + try: + send_result = await get_global_api().send_message(message) + if not send_result: + raise Exception("Legacy API send_message returned False (Target platform not found or connection lost)") + + if show_log: + logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'") + return True + except Exception as legacy_error: + # Fallback: 尝试使用额外的 API Server 发送 + global_api = get_global_api() + extra_server = getattr(global_api, "extra_server", None) + + if extra_server and extra_server.is_running(): + platform = message.message_info.platform + + # Fallback: 使用极其简单的 Platform -> API Key 映射 + # 只有收到过该平台的消息,我们才知道该平台的 API Key,才能回传消息 + platform_map = getattr(global_api, "platform_map", {}) + target_api_key = platform_map.get(platform) + + if target_api_key: + logger.warning(f"Legacy API发送失败: {legacy_error}。使用缓存API Key通过API Server发送...") + try: + # 构造 APIMessageBase + from maim_message.message import APIMessageBase, MessageDim + + msg_dim = MessageDim(api_key=target_api_key, platform=platform) + + api_message = APIMessageBase( + message_info=message.message_info, + message_segment=message.message_segment, + message_dim=msg_dim + ) + + # 直接调用 Server 的 send_message 接口,它会自动处理路由 + results = await extra_server.send_message(api_message) + + # 检查是否有任何连接发送成功 + if any(results.values()): + if show_log: + logger.info(f"已通过API Server Fallback将消息 '{message_preview}' 发往平台'{platform}' (key: {target_api_key})") + return True + else: + logger.error(f"API Server Fallback发送失败: 目标用户(Key={target_api_key})无活跃连接") + + except Exception as fallback_error: + logger.error(f"API Server Fallback发送出错: {fallback_error}") + else: + logger.warning(f"Legacy API发送失败且无可用API Server缓存 (未收到过来自 '{platform}' 的消息,无法获取API Key)") + + # 如果没有fallback或fallback失败,抛出原始异常 + raise legacy_error except Exception as e: logger.error(f"发送消息 '{message_preview}' 发往平台'{message.message_info.platform}' 失败: {str(e)}") diff --git a/src/common/message/api.py b/src/common/message/api.py index eed85c0a..f9f6c8f5 100644 --- a/src/common/message/api.py +++ b/src/common/message/api.py @@ -15,14 +15,18 @@ def get_global_api() -> MessageServer: # sourcery skip: extract-method # 检查maim_message版本 try: maim_message_version = importlib.metadata.version("maim_message") - version_compatible = [int(x) for x in maim_message_version.split(".")] >= [0, 3, 3] + version_int = [int(x) for x in maim_message_version.split(".")] + version_compatible = version_int >= [0, 3, 3] + # Check for API Server feature (>= 0.6.0) + has_api_server_feature = version_int >= [0, 6, 0] except (importlib.metadata.PackageNotFoundError, ValueError): version_compatible = False + has_api_server_feature = False # 读取配置项 maim_message_config = global_config.maim_message - # 设置基本参数 + # 设置基本参数 (Legacy Server Mode) kwargs = { "host": os.environ["HOST"], "port": int(os.environ["PORT"]), @@ -39,21 +43,129 @@ def get_global_api() -> MessageServer: # sourcery skip: extract-method if maim_message_config.auth_token and len(maim_message_config.auth_token) > 0: kwargs["enable_token"] = True - if maim_message_config.use_custom: - # 添加WSS模式支持 - del kwargs["app"] - kwargs["host"] = maim_message_config.host - kwargs["port"] = maim_message_config.port - kwargs["mode"] = maim_message_config.mode - if maim_message_config.use_wss: - if maim_message_config.cert_file: - kwargs["ssl_certfile"] = maim_message_config.cert_file - if maim_message_config.key_file: - kwargs["ssl_keyfile"] = maim_message_config.key_file - kwargs["enable_custom_uvicorn_logger"] = False + # Removed legacy custom config block (use_custom) as requested. + kwargs["enable_custom_uvicorn_logger"] = False global_api = MessageServer(**kwargs) if version_compatible and maim_message_config.auth_token: for token in maim_message_config.auth_token: global_api.add_valid_token(token) + + # --------------------------------------------------------------------- + # Additional API Server Configuration (maim_message >= 6.0) + # --------------------------------------------------------------------- + enable_api_server = maim_message_config.enable_api_server + + # 如果版本支持且启用了API Server,则初始化额外服务器 + if has_api_server_feature and enable_api_server: + try: + from maim_message.server import WebSocketServer, ServerConfig + from maim_message.message import APIMessageBase + + api_logger = get_logger("maim_message_api_server") + + # 1. Prepare Config + api_server_host = maim_message_config.api_server_host + api_server_port = maim_message_config.api_server_port + use_wss = maim_message_config.api_server_use_wss + + server_config = ServerConfig( + host=api_server_host, + port=api_server_port, + ssl_enabled=use_wss, + ssl_certfile=maim_message_config.api_server_cert_file if use_wss else None, + ssl_keyfile=maim_message_config.api_server_key_file if use_wss else None, + ) + + # 2. Setup Auth Handler + async def auth_handler(metadata: dict) -> bool: + allowed_keys = maim_message_config.api_server_allowed_api_keys + # If list is empty/None, allow all (default behavior of returning True) + if not allowed_keys: + return True + + api_key = metadata.get("api_key") + if api_key in allowed_keys: + return True + + api_logger.warning(f"Rejected connection with invalid API Key: {api_key}") + return False + + server_config.on_auth = auth_handler + + # 3. Setup Message Bridge + # Initialize refined route map if not exists + if not hasattr(global_api, "platform_map"): + global_api.platform_map = {} + + async def bridge_message_handler(message: APIMessageBase, metadata: dict): + # Bridge message to the main bot logic + # We convert APIMessageBase to dict to be compatible with legacy handlers + # that MainBot (ChatManager) expects. + msg_dict = message.to_dict() + + # Compatibility Layer: Flatten sender_info to top-level user_info/group_info + # Legacy MessageBase expects message_info to have user_info and group_info directly. + if "message_info" in msg_dict: + msg_info = msg_dict["message_info"] + sender_info = msg_info.get("sender_info") + if sender_info: + # If direct user_info/group_info are missing, populate them from sender_info + if "user_info" not in msg_info and (ui := sender_info.get("user_info")): + msg_info["user_info"] = ui + + if "group_info" not in msg_info and (gi := sender_info.get("group_info")): + msg_info["group_info"] = gi + + # Route Caching Logic: Simply map platform to API Key + # This allows us to send messages back to the correct API client for this platform + try: + api_key = metadata.get("api_key") + if api_key: + platform = msg_info.get("platform") + if platform: + global_api.platform_map[platform] = api_key + except Exception as e: + api_logger.warning(f"Failed to update platform map: {e}") + + # Compatibility Layer: Ensure raw_message exists (even if None) as it's part of MessageBase + if "raw_message" not in msg_dict: + msg_dict["raw_message"] = None + + await global_api.process_message(msg_dict) + + server_config.on_message = bridge_message_handler + + # 4. Initialize Server + extra_server = WebSocketServer(config=server_config) + + # 5. Patch global_api lifecycle methods to manage both servers + original_run = global_api.run + original_stop = global_api.stop + + async def patched_run(): + api_logger.info(f"Starting Additional API Server on {api_server_host}:{api_server_port} (WSS: {use_wss})") + # Start the extra server (non-blocking start) + await extra_server.start() + # Run the original legacy server (this usually keeps running) + await original_run() + + async def patched_stop(): + api_logger.info("Stopping Additional API Server...") + await extra_server.stop() + await original_stop() + + global_api.run = patched_run + global_api.stop = patched_stop + + # Attach for reference + global_api.extra_server = extra_server + + except ImportError: + get_logger("maim_message").error("Cannot import maim_message.server components. Is maim_message >= 0.6.0 installed?") + except Exception as e: + get_logger("maim_message").error(f"Failed to initialize Additional API Server: {e}") + import traceback + get_logger("maim_message").debug(traceback.format_exc()) + return global_api diff --git a/src/config/official_configs.py b/src/config/official_configs.py index bfd6ea5c..ccb1297a 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -648,29 +648,29 @@ class ExperimentalConfig(ConfigBase): class MaimMessageConfig(ConfigBase): """maim_message配置类""" - use_custom: bool = False - """是否使用自定义的maim_message配置""" - - host: str = "127.0.0.1" - """主机地址""" - - port: int = 8090 - """"端口号""" - - mode: Literal["ws", "tcp"] = "ws" - """连接模式,支持ws和tcp""" - - use_wss: bool = False - """是否使用WSS安全连接""" - - cert_file: str = "" - """SSL证书文件路径,仅在use_wss=True时有效""" - - key_file: str = "" - """SSL密钥文件路径,仅在use_wss=True时有效""" - auth_token: list[str] = field(default_factory=lambda: []) - """认证令牌,用于API验证,为空则不启用验证""" + """认证令牌,用于旧版API验证,为空则不启用验证""" + + enable_api_server: bool = False + """是否启用额外的新版API Server""" + + api_server_host: str = "0.0.0.0" + """新版API Server主机地址""" + + api_server_port: int = 8090 + """新版API Server端口号""" + + api_server_use_wss: bool = False + """新版API Server是否启用WSS""" + + api_server_cert_file: str = "" + """新版API Server SSL证书文件路径""" + + api_server_key_file: str = "" + """新版API Server SSL密钥文件路径""" + + api_server_allowed_api_keys: list[str] = field(default_factory=lambda: []) + """新版API Server允许的API Key列表,为空则允许所有连接""" @dataclass diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index d968f7d0..c8e1d1ea 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -232,15 +232,16 @@ show_planner_prompt = false # 是否显示planner的prompt和原始返回结果 show_lpmm_paragraph = false # 是否显示lpmm找到的相关文段日志 [maim_message] -auth_token = [] # 认证令牌,用于API验证,为空则不启用验证 -# 以下项目若要使用需要打开use_custom,并单独配置maim_message的服务器 -use_custom = false # 是否启用自定义的maim_message服务器,注意这需要设置新的端口,不能与.env重复 -host="127.0.0.1" -port=8090 -mode="ws" # 支持ws和tcp两种模式 -use_wss = false # 是否使用WSS安全连接,只支持ws模式 -cert_file = "" # SSL证书文件路径,仅在use_wss=true时有效 -key_file = "" # SSL密钥文件路径,仅在use_wss=true时有效 +auth_token = [] # 认证令牌,用于旧版API验证,为空则不启用验证 + +# 新版API Server配置(额外监听端口) +enable_api_server = false # 是否启用额外的新版API Server +api_server_host = "0.0.0.0" # 新版API Server主机地址 +api_server_port = 8090 # 新版API Server端口号 +api_server_use_wss = false # 新版API Server是否启用WSS +api_server_cert_file = "" # 新版API Server SSL证书文件路径 +api_server_key_file = "" # 新版API Server SSL密钥文件路径 +api_server_allowed_api_keys = [] # 新版API Server允许的API Key列表,为空则允许所有连接 [telemetry] #发送统计信息,主要是看全球有多少只麦麦 enable = true