Merge pull request #1432 from tcmofashi/dev

feat: 支持maim_message api-server模式
pull/1443/head
tcmofashi 2025-12-17 15:26:13 +08:00 committed by GitHub
commit dc95ea25a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 212 additions and 50 deletions

View File

@ -182,11 +182,60 @@ async def _send_message(message: MessageSending, show_log=True) -> bool:
logger.info(f"已将消息 '{message_preview}' 发往 WebUI 聊天室") logger.info(f"已将消息 '{message_preview}' 发往 WebUI 聊天室")
return True return True
# 直接调用API发送消息 # 尝试通过Legacy API发送消息如果失败尝试API Server Fallback
await get_global_api().send_message(message) try:
if show_log: send_result = await get_global_api().send_message(message)
logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'") if not send_result:
return True raise Exception("Legacy API send_message returned False (Target platform not found or connection lost)")
if show_log:
logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'")
return True
except Exception as legacy_error:
# Fallback: 尝试使用额外的 API Server 发送
global_api = get_global_api()
extra_server = getattr(global_api, "extra_server", None)
if extra_server and extra_server.is_running():
platform = message.message_info.platform
# Fallback: 使用极其简单的 Platform -> API Key 映射
# 只有收到过该平台的消息,我们才知道该平台的 API Key才能回传消息
platform_map = getattr(global_api, "platform_map", {})
target_api_key = platform_map.get(platform)
if target_api_key:
logger.warning(f"Legacy API发送失败: {legacy_error}。使用缓存API Key通过API Server发送...")
try:
# 构造 APIMessageBase
from maim_message.message import APIMessageBase, MessageDim
msg_dim = MessageDim(api_key=target_api_key, platform=platform)
api_message = APIMessageBase(
message_info=message.message_info,
message_segment=message.message_segment,
message_dim=msg_dim
)
# 直接调用 Server 的 send_message 接口,它会自动处理路由
results = await extra_server.send_message(api_message)
# 检查是否有任何连接发送成功
if any(results.values()):
if show_log:
logger.info(f"已通过API Server Fallback将消息 '{message_preview}' 发往平台'{platform}' (key: {target_api_key})")
return True
else:
logger.error(f"API Server Fallback发送失败: 目标用户(Key={target_api_key})无活跃连接")
except Exception as fallback_error:
logger.error(f"API Server Fallback发送出错: {fallback_error}")
else:
logger.warning(f"Legacy API发送失败且无可用API Server缓存 (未收到过来自 '{platform}' 的消息无法获取API Key)")
# 如果没有fallback或fallback失败抛出原始异常
raise legacy_error
except Exception as e: except Exception as e:
logger.error(f"发送消息 '{message_preview}' 发往平台'{message.message_info.platform}' 失败: {str(e)}") logger.error(f"发送消息 '{message_preview}' 发往平台'{message.message_info.platform}' 失败: {str(e)}")

View File

@ -15,14 +15,18 @@ def get_global_api() -> MessageServer: # sourcery skip: extract-method
# 检查maim_message版本 # 检查maim_message版本
try: try:
maim_message_version = importlib.metadata.version("maim_message") maim_message_version = importlib.metadata.version("maim_message")
version_compatible = [int(x) for x in maim_message_version.split(".")] >= [0, 3, 3] version_int = [int(x) for x in maim_message_version.split(".")]
version_compatible = version_int >= [0, 3, 3]
# Check for API Server feature (>= 0.6.0)
has_api_server_feature = version_int >= [0, 6, 0]
except (importlib.metadata.PackageNotFoundError, ValueError): except (importlib.metadata.PackageNotFoundError, ValueError):
version_compatible = False version_compatible = False
has_api_server_feature = False
# 读取配置项 # 读取配置项
maim_message_config = global_config.maim_message maim_message_config = global_config.maim_message
# 设置基本参数 # 设置基本参数 (Legacy Server Mode)
kwargs = { kwargs = {
"host": os.environ["HOST"], "host": os.environ["HOST"],
"port": int(os.environ["PORT"]), "port": int(os.environ["PORT"]),
@ -39,21 +43,129 @@ def get_global_api() -> MessageServer: # sourcery skip: extract-method
if maim_message_config.auth_token and len(maim_message_config.auth_token) > 0: if maim_message_config.auth_token and len(maim_message_config.auth_token) > 0:
kwargs["enable_token"] = True kwargs["enable_token"] = True
if maim_message_config.use_custom: # Removed legacy custom config block (use_custom) as requested.
# 添加WSS模式支持 kwargs["enable_custom_uvicorn_logger"] = False
del kwargs["app"]
kwargs["host"] = maim_message_config.host
kwargs["port"] = maim_message_config.port
kwargs["mode"] = maim_message_config.mode
if maim_message_config.use_wss:
if maim_message_config.cert_file:
kwargs["ssl_certfile"] = maim_message_config.cert_file
if maim_message_config.key_file:
kwargs["ssl_keyfile"] = maim_message_config.key_file
kwargs["enable_custom_uvicorn_logger"] = False
global_api = MessageServer(**kwargs) global_api = MessageServer(**kwargs)
if version_compatible and maim_message_config.auth_token: if version_compatible and maim_message_config.auth_token:
for token in maim_message_config.auth_token: for token in maim_message_config.auth_token:
global_api.add_valid_token(token) global_api.add_valid_token(token)
# ---------------------------------------------------------------------
# Additional API Server Configuration (maim_message >= 6.0)
# ---------------------------------------------------------------------
enable_api_server = maim_message_config.enable_api_server
# 如果版本支持且启用了API Server则初始化额外服务器
if has_api_server_feature and enable_api_server:
try:
from maim_message.server import WebSocketServer, ServerConfig
from maim_message.message import APIMessageBase
api_logger = get_logger("maim_message_api_server")
# 1. Prepare Config
api_server_host = maim_message_config.api_server_host
api_server_port = maim_message_config.api_server_port
use_wss = maim_message_config.api_server_use_wss
server_config = ServerConfig(
host=api_server_host,
port=api_server_port,
ssl_enabled=use_wss,
ssl_certfile=maim_message_config.api_server_cert_file if use_wss else None,
ssl_keyfile=maim_message_config.api_server_key_file if use_wss else None,
)
# 2. Setup Auth Handler
async def auth_handler(metadata: dict) -> bool:
allowed_keys = maim_message_config.api_server_allowed_api_keys
# If list is empty/None, allow all (default behavior of returning True)
if not allowed_keys:
return True
api_key = metadata.get("api_key")
if api_key in allowed_keys:
return True
api_logger.warning(f"Rejected connection with invalid API Key: {api_key}")
return False
server_config.on_auth = auth_handler
# 3. Setup Message Bridge
# Initialize refined route map if not exists
if not hasattr(global_api, "platform_map"):
global_api.platform_map = {}
async def bridge_message_handler(message: APIMessageBase, metadata: dict):
# Bridge message to the main bot logic
# We convert APIMessageBase to dict to be compatible with legacy handlers
# that MainBot (ChatManager) expects.
msg_dict = message.to_dict()
# Compatibility Layer: Flatten sender_info to top-level user_info/group_info
# Legacy MessageBase expects message_info to have user_info and group_info directly.
if "message_info" in msg_dict:
msg_info = msg_dict["message_info"]
sender_info = msg_info.get("sender_info")
if sender_info:
# If direct user_info/group_info are missing, populate them from sender_info
if "user_info" not in msg_info and (ui := sender_info.get("user_info")):
msg_info["user_info"] = ui
if "group_info" not in msg_info and (gi := sender_info.get("group_info")):
msg_info["group_info"] = gi
# Route Caching Logic: Simply map platform to API Key
# This allows us to send messages back to the correct API client for this platform
try:
api_key = metadata.get("api_key")
if api_key:
platform = msg_info.get("platform")
if platform:
global_api.platform_map[platform] = api_key
except Exception as e:
api_logger.warning(f"Failed to update platform map: {e}")
# Compatibility Layer: Ensure raw_message exists (even if None) as it's part of MessageBase
if "raw_message" not in msg_dict:
msg_dict["raw_message"] = None
await global_api.process_message(msg_dict)
server_config.on_message = bridge_message_handler
# 4. Initialize Server
extra_server = WebSocketServer(config=server_config)
# 5. Patch global_api lifecycle methods to manage both servers
original_run = global_api.run
original_stop = global_api.stop
async def patched_run():
api_logger.info(f"Starting Additional API Server on {api_server_host}:{api_server_port} (WSS: {use_wss})")
# Start the extra server (non-blocking start)
await extra_server.start()
# Run the original legacy server (this usually keeps running)
await original_run()
async def patched_stop():
api_logger.info("Stopping Additional API Server...")
await extra_server.stop()
await original_stop()
global_api.run = patched_run
global_api.stop = patched_stop
# Attach for reference
global_api.extra_server = extra_server
except ImportError:
get_logger("maim_message").error("Cannot import maim_message.server components. Is maim_message >= 0.6.0 installed?")
except Exception as e:
get_logger("maim_message").error(f"Failed to initialize Additional API Server: {e}")
import traceback
get_logger("maim_message").debug(traceback.format_exc())
return global_api return global_api

View File

@ -648,29 +648,29 @@ class ExperimentalConfig(ConfigBase):
class MaimMessageConfig(ConfigBase): class MaimMessageConfig(ConfigBase):
"""maim_message配置类""" """maim_message配置类"""
use_custom: bool = False
"""是否使用自定义的maim_message配置"""
host: str = "127.0.0.1"
"""主机地址"""
port: int = 8090
""""端口号"""
mode: Literal["ws", "tcp"] = "ws"
"""连接模式支持ws和tcp"""
use_wss: bool = False
"""是否使用WSS安全连接"""
cert_file: str = ""
"""SSL证书文件路径仅在use_wss=True时有效"""
key_file: str = ""
"""SSL密钥文件路径仅在use_wss=True时有效"""
auth_token: list[str] = field(default_factory=lambda: []) auth_token: list[str] = field(default_factory=lambda: [])
"""认证令牌用于API验证为空则不启用验证""" """认证令牌用于旧版API验证为空则不启用验证"""
enable_api_server: bool = False
"""是否启用额外的新版API Server"""
api_server_host: str = "0.0.0.0"
"""新版API Server主机地址"""
api_server_port: int = 8090
"""新版API Server端口号"""
api_server_use_wss: bool = False
"""新版API Server是否启用WSS"""
api_server_cert_file: str = ""
"""新版API Server SSL证书文件路径"""
api_server_key_file: str = ""
"""新版API Server SSL密钥文件路径"""
api_server_allowed_api_keys: list[str] = field(default_factory=lambda: [])
"""新版API Server允许的API Key列表为空则允许所有连接"""
@dataclass @dataclass

View File

@ -232,15 +232,16 @@ show_planner_prompt = false # 是否显示planner的prompt和原始返回结果
show_lpmm_paragraph = false # 是否显示lpmm找到的相关文段日志 show_lpmm_paragraph = false # 是否显示lpmm找到的相关文段日志
[maim_message] [maim_message]
auth_token = [] # 认证令牌用于API验证为空则不启用验证 auth_token = [] # 认证令牌用于旧版API验证为空则不启用验证
# 以下项目若要使用需要打开use_custom并单独配置maim_message的服务器
use_custom = false # 是否启用自定义的maim_message服务器注意这需要设置新的端口不能与.env重复 # 新版API Server配置额外监听端口
host="127.0.0.1" enable_api_server = false # 是否启用额外的新版API Server
port=8090 api_server_host = "0.0.0.0" # 新版API Server主机地址
mode="ws" # 支持ws和tcp两种模式 api_server_port = 8090 # 新版API Server端口号
use_wss = false # 是否使用WSS安全连接只支持ws模式 api_server_use_wss = false # 新版API Server是否启用WSS
cert_file = "" # SSL证书文件路径仅在use_wss=true时有效 api_server_cert_file = "" # 新版API Server SSL证书文件路径
key_file = "" # SSL密钥文件路径仅在use_wss=true时有效 api_server_key_file = "" # 新版API Server SSL密钥文件路径
api_server_allowed_api_keys = [] # 新版API Server允许的API Key列表为空则允许所有连接
[telemetry] #发送统计信息,主要是看全球有多少只麦麦 [telemetry] #发送统计信息,主要是看全球有多少只麦麦
enable = true enable = true