MaiBot/plugins/MaiBot_MCPBridgePlugin/DEVELOPMENT.md

9.3 KiB
Raw Permalink Blame History

MCP 桥接插件开发文档

本文档面向开发者,介绍插件的架构设计、核心模块和扩展方式。

架构概览

MaiBot_MCPBridgePlugin/
├── plugin.py          # 主插件文件,包含所有核心逻辑
├── mcp_client.py      # MCP 客户端封装
├── tool_chain.py      # 工具链Workflow模块
├── core/
│   └── claude_config.py # Claude Desktop mcpServers 解析/迁移
├── config.toml        # 运行时配置
└── _manifest.json     # 插件元数据

核心模块

1. MCP 客户端 (mcp_client.py)

封装了与 MCP 服务器的通信逻辑。

from .mcp_client import mcp_manager, MCPServerConfig, TransportType

# 添加服务器
config = MCPServerConfig(
    name="my-server",
    transport=TransportType.STREAMABLE_HTTP,
    url="https://mcp.example.com/mcp"
)
await mcp_manager.add_server(config)

# 调用工具
result = await mcp_manager.call_tool("server_tool_name", {"param": "value"})
if result.success:
    print(result.content)

支持的传输类型:

  • STDIO: 本地进程通信
  • SSE: Server-Sent Events
  • HTTP: HTTP 请求
  • STREAMABLE_HTTP: 流式 HTTP推荐

2. 工具注册系统

MCP 工具通过动态类创建注册到 MaiBot

# 创建工具代理类
class MCPToolProxy(BaseTool):
    name = "mcp_server_tool"
    description = "工具描述"
    parameters = [("param", ToolParamType.STRING, "参数描述", True, None)]
    available_for_llm = True
    
    async def execute(self, function_args):
        result = await mcp_manager.call_tool(self._mcp_tool_key, function_args)
        return {"name": self.name, "content": result.content}

3. 工具链模块 (tool_chain.py)

实现 Workflow 硬流程,支持多工具顺序执行。

from .tool_chain import ToolChainDefinition, ToolChainStep, tool_chain_manager

# 定义工具链
chain = ToolChainDefinition(
    name="search_and_detail",
    description="搜索并获取详情",
    input_params={"query": "搜索关键词"},
    steps=[
        ToolChainStep(
            tool_name="mcp_server_search",
            args_template={"keyword": "${input.query}"},
            output_key="search_result"
        ),
        ToolChainStep(
            tool_name="mcp_server_detail",
            args_template={"id": "${prev}"}
        )
    ]
)

# 注册并执行
tool_chain_manager.add_chain(chain)
result = await tool_chain_manager.execute_chain("search_and_detail", {"query": "test"})

变量替换语法:

  • ${input.参数名}: 用户输入
  • ${step.输出键}: 指定步骤的输出
  • ${prev}: 上一步输出
  • ${prev.字段}: 上一步输出JSON的字段
  • ${step.geo.return.0.location} / ${step.geo.return[0].location}: 数组下标访问
  • ${step.geo['return'][0]['location']}: bracket 写法(最通用)

双轨制架构

ReAct 软流程

将 MCP 工具注册到 MaiBot 的记忆检索 ReAct 系统LLM 自主决策调用。

def _register_tools_to_react(self) -> int:
    from src.memory_system.retrieval_tools import register_memory_retrieval_tool
    
    def make_execute_func(tool_key: str):
        async def execute_func(**kwargs) -> str:
            result = await mcp_manager.call_tool(tool_key, kwargs)
            return result.content if result.success else f"失败: {result.error}"
        return execute_func
    
    register_memory_retrieval_tool(
        name="mcp_tool_name",
        description="工具描述",
        parameters=[{"name": "param", "type": "string", "required": True}],
        execute_func=make_execute_func("tool_key")
    )

Workflow 硬流程

用户预定义的固定执行流程,注册为组合工具。

def _register_tool_chains(self) -> None:
    from src.plugin_system.core.component_registry import component_registry
    
    for chain_name, chain in tool_chain_manager.get_enabled_chains().items():
        info, tool_class = tool_chain_registry.register_chain(chain)
        info.plugin_name = self.plugin_name
        component_registry.register_component(info, tool_class)

配置系统

MCP 服务器配置Claude Desktop 规范)

插件只接受 Claude Desktop 的 mcpServers JSONcore/claude_config.py)。配置入口统一为:

  • WebUI/配置文件:[servers].claude_config_json
  • 命令:/mcp import(合并 mcpServers)与 /mcp export(导出当前 mcpServers

兼容迁移:

  • 若检测到旧版 servers.list,会自动迁移为 servers.claude_config_json(仅迁移到内存配置,需 WebUI 保存一次固化)。

WebUI 配置 Schema

使用 ConfigField 定义 WebUI 配置项:

config_schema = {
    "section_name": {
        "field_name": ConfigField(
            type=str,                    # 类型: str, bool, int, float
            default="default_value",     # 默认值
            description="字段描述",
            label="显示标签",
            input_type="textarea",       # 输入类型: text, textarea, password
            rows=5,                      # textarea 行数
            disabled=True,               # 只读
            choices=["a", "b"],          # 下拉选项
            hint="提示信息",
            order=1,                     # 排序
        ),
    },
}

配置读取

# 在组件中读取配置
value = self.get_config("section.key", default="fallback")

# 在插件类中读取
value = self.config.get("section", {}).get("key", "default")

事件处理

启动事件

class MCPStartupHandler(BaseEventHandler):
    event_type = EventType.ON_START
    handler_name = "mcp_startup"
    
    async def execute(self, message):
        global _plugin_instance
        if _plugin_instance:
            await _plugin_instance._async_connect_servers()
        return (True, True, None, None, None)

停止事件

class MCPStopHandler(BaseEventHandler):
    event_type = EventType.ON_STOP
    handler_name = "mcp_stop"
    
    async def execute(self, message):
        await mcp_manager.shutdown()
        return (True, True, None, None, None)

命令系统

class MCPStatusCommand(BaseCommand):
    command_name = "mcp_status"
    command_pattern = r"^/mcp(?:\s+(?P<action>\S+))?(?:\s+(?P<arg>.+))?$"
    
    async def execute(self) -> Tuple[bool, str, bool]:
        action = self.matched_groups.get("action", "")
        arg = self.matched_groups.get("arg", "")
        
        if action == "tools":
            await self.send_text("工具列表...")
        elif action == "reconnect":
            await self._handle_reconnect(arg)
        
        return (True, None, True)  # (成功, 消息, 拦截)

高级功能

调用追踪

from plugin import tool_call_tracer, ToolCallRecord

# 记录调用
record = ToolCallRecord(
    call_id="xxx",
    timestamp=time.time(),
    tool_name="tool",
    server_name="server",
    arguments={"key": "value"},
    success=True,
    duration_ms=100.0
)
tool_call_tracer.record(record)

# 查询记录
recent = tool_call_tracer.get_recent(10)
by_tool = tool_call_tracer.get_by_tool("tool_name")

调用缓存

from plugin import tool_call_cache

# 配置缓存
tool_call_cache.configure(
    enabled=True,
    ttl=300,           # 秒
    max_entries=200,
    exclude_tools="mcp_*_time_*"  # 排除模式
)

# 使用缓存
cached = tool_call_cache.get("tool_name", {"param": "value"})
if cached is None:
    result = await call_tool(...)
    tool_call_cache.set("tool_name", {"param": "value"}, result)

权限控制

from plugin import permission_checker

# 配置权限
permission_checker.configure(
    enabled=True,
    default_mode="allow_all",  # 或 "deny_all"
    rules_json='[{"tool": "mcp_*_delete_*", "denied": ["qq:123:group"]}]',
    quick_deny_groups="123456789",
    quick_allow_users="111111111"
)

# 检查权限
allowed = permission_checker.check(
    tool_name="mcp_server_delete",
    chat_id="123456",
    user_id="789",
    is_group=True
)

断路器模式

MCP 客户端内置断路器,故障服务器快速失败:

  • 连续失败 N 次后熔断
  • 熔断期间直接返回错误
  • 定期尝试恢复

扩展开发

添加新的传输类型

  1. mcp_client.py 中添加 TransportType 枚举值
  2. 实现对应的连接逻辑
  3. 更新 _create_transport() 方法

添加新的工具类型

  1. 继承 BaseTool 创建新类
  2. get_plugin_components() 中注册
  3. 实现 execute() 方法

添加新的命令

  1. MCPStatusCommand.execute() 中添加新的 action 分支
  2. 或创建新的 BaseCommand 子类

调试技巧

日志级别

from src.common.logger import get_logger
logger = get_logger("mcp_bridge_plugin")

logger.debug("详细调试信息")
logger.info("一般信息")
logger.warning("警告")
logger.error("错误")

常用调试命令

/mcp                    # 查看状态
/mcp tools              # 查看工具列表
/mcp trace              # 查看调用记录
/mcp cache              # 查看缓存状态
/mcp chain              # 查看工具链

更新日志

plugins/MaiBot_MCPBridgePlugin/CHANGELOG.md

开发约定

  • 本仓库不提交测试脚本/临时复现文件;如需本地验证,可自行在工作区创建未跟踪文件(建议放到 .local/ 并加入 .gitignore)。