Merge pull request #1404 from CharTyr/feat/mcp-bridge-plugin

Feat/mcp bridge plugin
pull/1414/head
SengokuCola 2025-12-06 00:51:15 +08:00 committed by GitHub
commit 026fdc0372
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 6557 additions and 37 deletions

8
.gitignore vendored
View File

@ -35,9 +35,6 @@ message_queue_content.bat
message_queue_window.bat
message_queue_window.txt
queue_update.txt
memory_graph.gml
/src/tools/tool_can_use/auto_create_tool.py
/src/tools/tool_can_use/execute_python_code_tool.py
.env
.env.*
.cursor
@ -48,9 +45,6 @@ config/lpmm_config.toml
config/lpmm_config.toml.bak
template/compare/bot_config_template.toml
template/compare/model_config_template.toml
(测试版)麦麦生成人格.bat
(临时版)麦麦开始学习.bat
src/plugins/utils/statistic.py
CLAUDE.md
MaiBot-Dashboard/
cloudflare-workers/
@ -327,6 +321,7 @@ run_pet.bat
!/plugins/emoji_manage_plugin
!/plugins/take_picture_plugin
!/plugins/deep_think
!/plugins/MaiBot_MCPBridgePlugin
!/plugins/ChatFrequency/
!/plugins/__init__.py
@ -334,4 +329,3 @@ config.toml
interested_rates.txt
MaiBot.code-workspace
*.lock

View File

@ -0,0 +1,569 @@
# MCP 桥接插件 - 开发文档
本文档面向 AI 助手或开发者进行插件开发/维护。
## 前置知识
本插件基于 MaiBot 插件系统开发,需要了解:
- MaiBot 插件框架:`BasePlugin`, `BaseTool`, `BaseCommand`, `BaseEventHandler`
- 配置系统:`ConfigField`, `config_schema`
- 组件注册:`component_registry.register_component()`
详见项目根目录 `.kiro/steering/plugin-dev.md`
---
## 版本历史
| 版本 | 主要功能 |
|------|----------|
| v1.5.4 | 易用性优化:新增 MCP 服务器获取快捷入口 |
| v1.5.3 | 配置优化:新增智能心跳 WebUI 配置项 |
| v1.5.2 | 性能优化:智能心跳间隔,根据服务器稳定性动态调整 |
| v1.5.1 | 易用性优化:新增「快速添加服务器」表单式配置 |
| v1.5.0 | 性能优化:服务器并行连接,大幅减少启动时间 |
| v1.4.4 | 修复首次生成默认配置文件时多行字符串导致 TOML 解析失败 |
| v1.4.3 | 修复 WebUI 保存配置后多行字符串格式错误导致配置文件无法读取 |
| v1.4.2 | HTTP 鉴权头支持headers 字段) |
| v1.4.0 | 工具禁用、调用追踪、缓存、权限控制、WebUI 易用性改进 |
| v1.3.0 | 结果后处理LLM 摘要提炼) |
| v1.2.0 | Resources/Prompts 支持(实验性) |
| v1.1.x | 心跳检测、自动重连、调用统计、`/mcp` 命令 |
| v1.0.0 | 基础 MCP 桥接 |
---
## 项目结构
```
MCPBridgePlugin/
├── plugin.py # 主插件逻辑1800+ 行)
├── mcp_client.py # MCP 客户端封装800+ 行)
├── _manifest.json # 插件清单
├── config.example.toml # 配置示例
├── requirements.txt # 依赖mcp>=1.0.0
├── README.md # 用户文档
└── DEVELOPMENT.md # 开发文档(本文件)
```
---
## 核心模块详解
### 1. mcp_client.py - MCP 客户端
负责与 MCP 服务器通信,可独立于 MaiBot 运行测试。
#### 数据类
```python
class TransportType(Enum):
STDIO = "stdio" # 本地进程
SSE = "sse" # Server-Sent Events
HTTP = "http" # HTTP
STREAMABLE_HTTP = "streamable_http" # HTTP Streamable推荐
@dataclass
class MCPServerConfig:
name: str # 服务器唯一标识
enabled: bool = True
transport: TransportType = TransportType.STDIO
command: str = "" # stdio: 启动命令
args: List[str] = field(default_factory=list) # stdio: 参数
env: Dict[str, str] = field(default_factory=dict) # stdio: 环境变量
url: str = "" # http/sse: 服务器 URL
@dataclass
class MCPToolInfo:
name: str # 工具原始名称
description: str
input_schema: Dict[str, Any] # JSON Schema
server_name: str
@dataclass
class MCPCallResult:
success: bool
content: str = ""
error: Optional[str] = None
duration_ms: float = 0.0
@dataclass
class MCPResourceInfo:
uri: str
name: str
description: str = ""
mime_type: Optional[str] = None
server_name: str = ""
@dataclass
class MCPPromptInfo:
name: str
description: str = ""
arguments: List[Dict[str, Any]] = field(default_factory=list)
server_name: str = ""
```
#### MCPClientSession
管理单个 MCP 服务器连接。
```python
class MCPClientSession:
def __init__(self, config: MCPServerConfig): ...
async def connect(self) -> bool:
"""连接服务器,返回是否成功"""
async def disconnect(self) -> None:
"""断开连接"""
async def call_tool(self, tool_name: str, arguments: Dict) -> MCPCallResult:
"""调用工具"""
async def check_health(self) -> bool:
"""健康检查(用于心跳)"""
async def fetch_resources(self) -> bool:
"""获取资源列表"""
async def read_resource(self, uri: str) -> MCPCallResult:
"""读取资源"""
async def fetch_prompts(self) -> bool:
"""获取提示模板列表"""
async def get_prompt(self, name: str, arguments: Optional[Dict]) -> MCPCallResult:
"""获取提示模板"""
@property
def tools(self) -> List[MCPToolInfo]: ...
@property
def resources(self) -> List[MCPResourceInfo]: ...
@property
def prompts(self) -> List[MCPPromptInfo]: ...
@property
def is_connected(self) -> bool: ...
```
#### MCPClientManager
全局单例,管理多服务器。
```python
class MCPClientManager:
def configure(self, settings: Dict) -> None:
"""配置超时、重试等参数"""
async def add_server(self, config: MCPServerConfig) -> bool:
"""添加并连接服务器"""
async def remove_server(self, server_name: str) -> bool:
"""移除服务器"""
async def reconnect_server(self, server_name: str) -> bool:
"""重连服务器"""
async def call_tool(self, tool_key: str, arguments: Dict) -> MCPCallResult:
"""调用工具tool_key 格式: mcp_{server}_{tool}"""
async def start_heartbeat(self) -> None:
"""启动心跳检测"""
async def shutdown(self) -> None:
"""关闭所有连接"""
def get_status(self) -> Dict[str, Any]:
"""获取状态"""
def get_all_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
def set_status_change_callback(self, callback: Callable) -> None:
"""设置状态变化回调"""
@property
def all_tools(self) -> Dict[str, Tuple[MCPToolInfo, MCPClientSession]]: ...
@property
def all_resources(self) -> Dict[str, Tuple[MCPResourceInfo, MCPClientSession]]: ...
@property
def all_prompts(self) -> Dict[str, Tuple[MCPPromptInfo, MCPClientSession]]: ...
@property
def disconnected_servers(self) -> List[str]: ...
# 全局单例
mcp_manager = MCPClientManager()
```
---
### 2. plugin.py - MaiBot 插件
#### v1.4.0 新增模块
```python
# ============ 调用追踪 ============
@dataclass
class ToolCallRecord:
call_id: str # UUID
timestamp: float
tool_name: str
server_name: str
chat_id: str = ""
user_id: str = ""
user_query: str = ""
arguments: Dict = field(default_factory=dict)
raw_result: str = ""
processed_result: str = ""
duration_ms: float = 0.0
success: bool = True
error: str = ""
post_processed: bool = False
cache_hit: bool = False
class ToolCallTracer:
def configure(self, enabled: bool, max_records: int, log_enabled: bool, log_path: Path): ...
def record(self, record: ToolCallRecord) -> None: ...
def get_recent(self, n: int = 10) -> List[ToolCallRecord]: ...
def get_by_tool(self, tool_name: str) -> List[ToolCallRecord]: ...
def clear(self) -> None: ...
tool_call_tracer = ToolCallTracer()
# ============ 调用缓存 ============
@dataclass
class CacheEntry:
tool_name: str
args_hash: str # MD5(tool_name + sorted_json_args)
result: str
created_at: float
expires_at: float
hit_count: int = 0
class ToolCallCache:
def configure(self, enabled: bool, ttl: int, max_entries: int, exclude_tools: str): ...
def get(self, tool_name: str, args: Dict) -> Optional[str]: ...
def set(self, tool_name: str, args: Dict, result: str) -> None: ...
def clear(self) -> None: ...
def get_stats(self) -> Dict[str, Any]: ...
tool_call_cache = ToolCallCache()
# ============ 权限控制 ============
class PermissionChecker:
def configure(self, enabled: bool, default_mode: str, rules_json: str,
quick_deny_groups: str = "", quick_allow_users: str = ""): ...
def check(self, tool_name: str, chat_id: str, user_id: str, is_group: bool) -> bool: ...
def get_rules_for_tool(self, tool_name: str) -> List[Dict]: ...
permission_checker = PermissionChecker()
```
#### 工具代理
```python
class MCPToolProxy(BaseTool):
"""所有 MCP 工具的基类"""
# 类属性(动态子类覆盖)
name: str = ""
description: str = ""
parameters: List[Tuple] = []
available_for_llm: bool = True
# MCP 属性
_mcp_tool_key: str = ""
_mcp_original_name: str = ""
_mcp_server_name: str = ""
async def execute(self, function_args: Dict) -> Dict[str, Any]:
"""执行流程:
1. 权限检查 → 拒绝则返回错误
2. 缓存检查 → 命中则返回缓存
3. 调用 MCP 服务器
4. 存入缓存
5. 后处理(可选)
6. 记录追踪
7. 返回结果
"""
def create_mcp_tool_class(tool_key: str, tool_info: MCPToolInfo,
tool_prefix: str, disabled: bool = False) -> Type[MCPToolProxy]:
"""动态创建工具类"""
```
#### 内置工具
```python
class MCPStatusTool(BaseTool):
"""mcp_status - 查询状态/工具/资源/模板/统计/追踪/缓存"""
name = "mcp_status"
parameters = [
("query_type", STRING, "查询类型", False,
["status", "tools", "resources", "prompts", "stats", "trace", "cache", "all"]),
("server_name", STRING, "服务器名称", False, None),
]
class MCPReadResourceTool(BaseTool):
"""mcp_read_resource - 读取资源"""
name = "mcp_read_resource"
class MCPGetPromptTool(BaseTool):
"""mcp_get_prompt - 获取提示模板"""
name = "mcp_get_prompt"
```
#### 命令
```python
class MCPStatusCommand(BaseCommand):
"""处理 /mcp 命令"""
command_pattern = r"^[/]mcp(?:\s+(?P<subcommand>status|tools|stats|reconnect|trace|cache|perm))?(?:\s+(?P<arg>\S+))?$"
# 子命令处理
async def _handle_reconnect(self, server_name): ...
async def _handle_trace(self, arg): ...
async def _handle_cache(self, arg): ...
async def _handle_perm(self, arg): ...
```
#### 事件处理器
```python
class MCPStartupHandler(BaseEventHandler):
"""ON_START - 连接服务器、注册工具"""
event_type = EventType.ON_START
class MCPStopHandler(BaseEventHandler):
"""ON_STOP - 关闭连接"""
event_type = EventType.ON_STOP
```
#### 主插件类
```python
@register_plugin
class MCPBridgePlugin(BasePlugin):
plugin_name = "mcp_bridge_plugin"
python_dependencies = ["mcp"]
config_section_descriptions = {
"guide": "📖 快速入门",
"servers": "🔌 服务器配置",
"status": "📊 运行状态",
"plugin": "插件开关",
"settings": "⚙️ 高级设置",
"tools": "🔧 工具管理",
"permissions": "🔐 权限控制",
}
config_schema = {
"guide": { "quick_start": ConfigField(...) },
"plugin": { "enabled": ConfigField(...) },
"settings": {
# 基础tool_prefix, connect_timeout, call_timeout, auto_connect, retry_*
# 心跳heartbeat_enabled, heartbeat_interval, auto_reconnect, max_reconnect_attempts
# 高级enable_resources, enable_prompts
# 后处理post_process_*
# 追踪trace_*
# 缓存cache_*
},
"tools": { "tool_list", "disabled_tools" },
"permissions": { "perm_enabled", "perm_default_mode", "quick_deny_groups", "quick_allow_users", "perm_rules" },
"servers": { "list" },
"status": { "connection_status" },
}
def __init__(self):
# 配置 mcp_manager, tool_call_tracer, tool_call_cache, permission_checker
async def _async_connect_servers(self):
# 解析配置 → 连接服务器 → 注册工具(检查禁用列表)
def _update_status_display(self):
# 更新 WebUI 状态显示
def _update_tool_list_display(self):
# 更新工具清单显示
```
---
## 数据流
```
MaiBot 启动
MCPBridgePlugin.__init__()
├─ mcp_manager.configure(settings)
├─ tool_call_tracer.configure(...)
├─ tool_call_cache.configure(...)
└─ permission_checker.configure(...)
ON_START 事件 → MCPStartupHandler.execute()
_async_connect_servers()
├─ 解析 servers.list JSON
├─ 遍历服务器配置
│ ├─ mcp_manager.add_server(config)
│ ├─ 获取工具列表
│ ├─ 检查 disabled_tools
│ └─ component_registry.register_component(tool_info, tool_class)
├─ _update_status_display()
└─ _update_tool_list_display()
mcp_manager.start_heartbeat()
LLM 调用工具 → MCPToolProxy.execute(function_args)
├─ 1. permission_checker.check() → 拒绝则返回错误
├─ 2. tool_call_cache.get() → 命中则跳到步骤 5
├─ 3. mcp_manager.call_tool()
├─ 4. tool_call_cache.set()
├─ 5. _post_process_result() (如果启用且超过阈值)
├─ 6. tool_call_tracer.record()
└─ 7. 返回 {"name": ..., "content": ...}
ON_STOP 事件 → MCPStopHandler.execute()
mcp_manager.shutdown()
mcp_tool_registry.clear()
```
---
## 配置项速查
### settings高级设置
| 配置项 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| tool_prefix | str | "mcp" | 工具名前缀 |
| connect_timeout | float | 30.0 | 连接超时(秒) |
| call_timeout | float | 60.0 | 调用超时(秒) |
| auto_connect | bool | true | 自动连接 |
| retry_attempts | int | 3 | 重试次数 |
| retry_interval | float | 5.0 | 重试间隔 |
| heartbeat_enabled | bool | true | 心跳检测 |
| heartbeat_interval | float | 60.0 | 心跳间隔 |
| auto_reconnect | bool | true | 自动重连 |
| max_reconnect_attempts | int | 3 | 最大重连次数 |
| enable_resources | bool | false | Resources 支持 |
| enable_prompts | bool | false | Prompts 支持 |
| post_process_enabled | bool | false | 结果后处理 |
| post_process_threshold | int | 500 | 后处理阈值 |
| trace_enabled | bool | true | 调用追踪 |
| trace_max_records | int | 100 | 追踪记录上限 |
| cache_enabled | bool | false | 调用缓存 |
| cache_ttl | int | 300 | 缓存 TTL |
| cache_max_entries | int | 200 | 最大缓存条目 |
### permissions权限控制
| 配置项 | 说明 |
|--------|------|
| perm_enabled | 启用权限控制 |
| perm_default_mode | allow_all / deny_all |
| quick_deny_groups | 禁用群列表(每行一个群号) |
| quick_allow_users | 管理员白名单(每行一个 QQ 号) |
| perm_rules | 高级规则 JSON |
---
## 扩展开发示例
### 添加新命令子命令
```python
# 1. 修改 command_pattern
command_pattern = r"^[/]mcp(?:\s+(?P<subcommand>status|...|newcmd))?..."
# 2. 在 execute() 添加分支
if subcommand == "newcmd":
return await self._handle_newcmd(arg)
# 3. 实现处理方法
async def _handle_newcmd(self, arg: str = None):
# 处理逻辑
await self.send_text("结果")
return (True, None, True)
```
### 添加新配置项
```python
# 1. config_schema 添加
"settings": {
"new_option": ConfigField(
type=bool,
default=False,
description="新选项说明",
label="🆕 新选项",
order=50,
),
}
# 2. 在 __init__ 或相应方法中读取
new_option = settings.get("new_option", False)
```
### 添加新的全局模块
```python
# 1. 定义数据类和管理类
@dataclass
class NewRecord:
...
class NewManager:
def configure(self, ...): ...
def do_something(self, ...): ...
new_manager = NewManager()
# 2. 在 MCPBridgePlugin.__init__ 中配置
new_manager.configure(...)
# 3. 在 MCPToolProxy.execute() 中使用
result = new_manager.do_something(...)
```
---
## 调试
```python
# 导入
from plugins.MCPBridgePlugin.mcp_client import mcp_manager
from plugins.MCPBridgePlugin.plugin import tool_call_tracer, tool_call_cache, permission_checker
# 检查状态
mcp_manager.get_status()
mcp_manager.get_all_stats()
# 追踪记录
tool_call_tracer.get_recent(10)
# 缓存状态
tool_call_cache.get_stats()
# 手动调用
result = await mcp_manager.call_tool("mcp_server_tool", {"arg": "value"})
```
---
## 依赖
- MaiBot >= 0.11.6
- Python >= 3.10
- mcp >= 1.0.0
## 许可证
AGPL-3.0

View File

@ -0,0 +1,220 @@
# MCP 桥接插件
将 [MCP (Model Context Protocol)](https://modelcontextprotocol.io/) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具。
<img width="3012" height="1794" alt="image" src="https://github.com/user-attachments/assets/ece56404-301a-4abf-b16d-87bd430fc977" />
## 🚀 快速开始
### 1. 安装
```bash
# 克隆到 MaiBot 插件目录
cd /path/to/MaiBot/plugins
git clone https://github.com/CharTyr/MaiBot_MCPBridgePlugin.git MCPBridgePlugin
# 安装依赖
pip install mcp
# 复制配置文件
cd MCPBridgePlugin
cp config.example.toml config.toml
```
### 2. 添加服务器
编辑 `config.toml`,在 `[servers]``list` 中添加服务器:
**免费服务器:**
```json
{"name": "time", "enabled": true, "transport": "streamable_http", "url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time"}
```
**带鉴权的服务器v1.4.2**
```json
{"name": "my-server", "enabled": true, "transport": "streamable_http", "url": "https://mcp.xxx.com/mcp", "headers": {"Authorization": "Bearer 你的密钥"}}
```
**本地服务器(需要 uvx**
```json
{"name": "fetch", "enabled": true, "transport": "stdio", "command": "uvx", "args": ["mcp-server-fetch"]}
```
### 3. 启动
重启 MaiBot或发送 `/mcp reconnect`
---
## 📚 去哪找 MCP 服务器?
| 平台 | 说明 |
|------|------|
| [mcp.modelscope.cn](https://mcp.modelscope.cn/) | 魔搭 ModelScope免费推荐 |
| [smithery.ai](https://smithery.ai/) | MCP 服务器注册中心 |
| [github.com/modelcontextprotocol/servers](https://github.com/modelcontextprotocol/servers) | 官方服务器列表 |
---
## 💡 常用命令
| 命令 | 说明 |
|------|------|
| `/mcp` | 查看连接状态 |
| `/mcp tools` | 查看可用工具 |
| `/mcp reconnect` | 重连服务器 |
| `/mcp trace` | 查看调用记录 |
| `/mcp cache` | 查看缓存状态 |
| `/mcp perm` | 查看权限配置 |
| `/mcp import <json>` | 🆕 导入 Claude Desktop 配置 |
| `/mcp export [claude]` | 🆕 导出配置 |
| `/mcp search <关键词>` | 🆕 搜索工具 |
---
## ✨ 功能特性
### 核心功能
- 🔌 多服务器同时连接
- 📡 支持 stdio / SSE / HTTP / Streamable HTTP
- 🔄 自动重试、心跳检测、断线重连
- 🖥️ WebUI 完整配置支持
### v1.7.0 新增
- ⚡ **断路器模式** - 故障服务器快速失败,避免拖慢整体响应
- 🔄 **状态实时刷新** - WebUI 自动更新连接状态(可配置间隔)
- 🔍 **工具搜索** - `/mcp search <关键词>` 快速查找工具
### v1.6.0 新增
- 📥 **配置导入** - 从 Claude Desktop 格式一键导入
- 📤 **配置导出** - 导出为 Claude Desktop / Kiro / MaiBot 格式
### v1.4.0 新增
- 🚫 **工具禁用** - WebUI 直接禁用不想用的工具
- 🔍 **调用追踪** - 记录每次调用详情,便于调试
- 🗄️ **调用缓存** - 相同请求自动缓存
- 🔐 **权限控制** - 按群/用户限制工具使用
### 高级功能
- 📦 Resources 支持(实验性)
- 📝 Prompts 支持(实验性)
- 🔄 结果后处理LLM 摘要提炼)
---
## ⚙️ 配置说明
### 服务器配置
```json
[
{
"name": "服务器名",
"enabled": true,
"transport": "streamable_http",
"url": "https://..."
}
]
```
| 字段 | 说明 |
|------|------|
| `name` | 服务器名称(唯一) |
| `enabled` | 是否启用 |
| `transport` | `stdio` / `sse` / `http` / `streamable_http` |
| `url` | 远程服务器地址 |
| `headers` | 🆕 鉴权头(如 `{"Authorization": "Bearer xxx"}` |
| `command` / `args` | 本地服务器启动命令 |
### 权限控制v1.4.0
**快捷配置(推荐):**
```toml
[permissions]
perm_enabled = true
quick_deny_groups = "123456789" # 禁用的群号
quick_allow_users = "111111111" # 管理员白名单
```
**高级规则:**
```json
[{"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}]
```
### 工具禁用
```toml
[tools]
disabled_tools = '''
mcp_filesystem_delete_file
mcp_filesystem_write_file
'''
```
### 调用缓存
```toml
[settings]
cache_enabled = true
cache_ttl = 300
cache_exclude_tools = "mcp_*_time_*"
```
---
## ❓ 常见问题
**Q: 工具没有注册?**
- 检查 `enabled = true`
- 检查 MaiBot 日志错误信息
- 确认 `pip install mcp`
**Q: JSON 格式报错?**
- 多行 JSON 用 `'''` 三引号包裹
- 使用英文双引号 `"`
**Q: 如何手动重连?**
- `/mcp reconnect``/mcp reconnect 服务器名`
---
## 📥 配置导入导出v1.6.0
### 从 Claude Desktop 导入
如果你已有 Claude Desktop 的 MCP 配置,可以直接导入:
```
/mcp import {"mcpServers":{"time":{"command":"uvx","args":["mcp-server-time"]},"fetch":{"command":"uvx","args":["mcp-server-fetch"]}}}
```
支持的格式:
- Claude Desktop 格式(`mcpServers` 对象)
- Kiro MCP 格式
- MaiBot 格式(数组)
### 导出配置
```
/mcp export # 导出为 Claude Desktop 格式(默认)
/mcp export claude # 导出为 Claude Desktop 格式
/mcp export kiro # 导出为 Kiro MCP 格式
/mcp export maibot # 导出为 MaiBot 格式
```
### 注意事项
- 导入时会自动跳过同名服务器
- 导入后需要发送 `/mcp reconnect` 使配置生效
- 支持 stdio、sse、http、streamable_http 全部传输类型
---
## 📋 依赖
- MaiBot >= 0.11.6
- Python >= 3.10
- mcp >= 1.0.0
## 📄 许可证
AGPL-3.0

View File

@ -0,0 +1,44 @@
"""
MCP 桥接插件
MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
v1.1.0 新增功能:
- 心跳检测和自动重连
- 调用统计次数成功率耗时
- 更好的错误处理
v1.2.0 新增功能:
- Resources 支持资源读取
- Prompts 支持提示模板
"""
from .plugin import MCPBridgePlugin, mcp_tool_registry, MCPStartupHandler, MCPStopHandler
from .mcp_client import (
mcp_manager,
MCPClientManager,
MCPServerConfig,
TransportType,
MCPCallResult,
MCPToolInfo,
MCPResourceInfo,
MCPPromptInfo,
ToolCallStats,
ServerStats,
)
__all__ = [
"MCPBridgePlugin",
"mcp_tool_registry",
"mcp_manager",
"MCPClientManager",
"MCPServerConfig",
"TransportType",
"MCPCallResult",
"MCPToolInfo",
"MCPResourceInfo",
"MCPPromptInfo",
"ToolCallStats",
"ServerStats",
"MCPStartupHandler",
"MCPStopHandler",
]

View File

@ -0,0 +1,60 @@
{
"manifest_version": 1,
"name": "MCP桥接插件",
"version": "1.7.0",
"description": "将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具",
"author": {
"name": "CharTyr",
"url": "https://github.com/CharTyr"
},
"license": "AGPL-3.0",
"host_application": {
"min_version": "0.11.6"
},
"homepage_url": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"repository_url": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"keywords": [
"mcp",
"bridge",
"tool",
"integration",
"resources",
"prompts",
"post-process",
"cache",
"trace",
"permissions",
"import",
"export",
"claude-desktop"
],
"categories": [
"工具扩展",
"外部集成"
],
"default_locale": "zh-CN",
"plugin_info": {
"is_built_in": false,
"components": [],
"features": [
"支持多个 MCP 服务器",
"自动发现并注册 MCP 工具",
"支持 stdio、SSE、HTTP、Streamable HTTP 四种传输方式",
"工具参数自动转换",
"心跳检测与自动重连",
"调用统计(次数、成功率、耗时)",
"WebUI 配置支持",
"Resources 支持(实验性)",
"Prompts 支持(实验性)",
"结果后处理LLM 摘要提炼)",
"工具禁用管理",
"调用链路追踪",
"工具调用缓存LRU",
"工具权限控制(群/用户级别)",
"配置导入导出Claude Desktop / Kiro 格式)",
"断路器模式(故障快速失败)",
"状态实时刷新"
]
},
"id": "MaiBot Community.MCPBridgePlugin"
}

View File

@ -0,0 +1,263 @@
# MCP桥接插件 v1.7.0 - 配置文件示例
# 将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
#
# 使用方法:复制此文件为 config.toml然后根据需要修改配置
#
# ============================================================
# 🎯 快速开始(三步)
# ============================================================
# 1. 在下方 [servers] 添加 MCP 服务器配置
# 2. 将 enabled 改为 true 启用服务器
# 3. 重启 MaiBot 或发送 /mcp reconnect
#
# ============================================================
# 📚 去哪找 MCP 服务器?
# ============================================================
#
# 【远程服务(推荐新手)】
# - ModelScope: https://mcp.modelscope.cn/ (免费,推荐)
# - Smithery: https://smithery.ai/
# - Glama: https://glama.ai/mcp/servers
#
# 【本地服务(需要 npx 或 uvx
# - 官方列表: https://github.com/modelcontextprotocol/servers
#
# ============================================================
# ============================================================
# 插件基本信息
# ============================================================
[plugin]
name = "mcp_bridge_plugin"
version = "1.7.0"
config_version = "1.7.0"
enabled = false # 默认禁用,在 WebUI 中启用
# ============================================================
# 全局设置
# ============================================================
[settings]
# 🏷️ 工具前缀 - 用于区分 MCP 工具和原生工具
tool_prefix = "mcp"
# ⏱️ 连接超时(秒)
connect_timeout = 30.0
# ⏱️ 调用超时(秒)
call_timeout = 60.0
# 🔄 自动连接 - 启动时自动连接所有已启用的服务器
auto_connect = true
# 🔁 重试次数 - 连接失败时的重试次数
retry_attempts = 3
# ⏳ 重试间隔(秒)
retry_interval = 5.0
# 💓 心跳检测 - 定期检测服务器连接状态
heartbeat_enabled = true
# 💓 心跳间隔(秒)- 建议 30-120 秒
heartbeat_interval = 60.0
# 🔄 自动重连 - 检测到断开时自动尝试重连
auto_reconnect = true
# 🔄 最大重连次数 - 连续重连失败后暂停重连
max_reconnect_attempts = 3
# ============================================================
# v1.2.0 高级功能(实验性)
# ============================================================
# 📦 启用 Resources - 允许读取 MCP 服务器提供的资源
enable_resources = false
# 📝 启用 Prompts - 允许使用 MCP 服务器提供的提示模板
enable_prompts = false
# ============================================================
# v1.3.0 结果后处理功能
# ============================================================
# 当 MCP 工具返回的内容过长时,使用 LLM 对结果进行摘要提炼
# 🔄 启用结果后处理
post_process_enabled = false
# 📏 后处理阈值(字符数)- 结果长度超过此值才触发后处理
post_process_threshold = 500
# <20> 后处理输e出限制 - LLM 摘要输出的最大 token 数
post_process_max_tokens = 500
# 🤖 后处理模型(可选)- 留空则使用 utils 模型组
post_process_model = ""
# <20> 后处理提示词模板-
post_process_prompt = '''{query}
{result}
'''
# ============================================================
# 🆕 v1.4.0 调用链路追踪
# ============================================================
# 记录工具调用详情,便于调试和分析
# 🔍 启用调用追踪
trace_enabled = true
# 📊 追踪记录上限 - 内存中保留的最大记录数
trace_max_records = 50
# 📝 追踪日志文件 - 是否将追踪记录写入日志文件
# 启用后记录写入 plugins/MaiBot_MCPBridgePlugin/logs/trace.jsonl
trace_log_enabled = false
# ============================================================
# 🆕 v1.4.0 工具调用缓存
# ============================================================
# 缓存相同参数的调用结果,减少重复请求
# 🗄️ 启用调用缓存
cache_enabled = false
# ⏱️ 缓存有效期(秒)
cache_ttl = 300
# 📦 最大缓存条目 - 超出后 LRU 淘汰
cache_max_entries = 200
# <20> 缓存排除列表 - 即不缓存的工具(每行一个,支持通配符 *
# 时间类、随机类工具建议排除
cache_exclude_tools = '''
mcp_*_time_*
mcp_*_random_*
'''
# ============================================================
# 🆕 v1.4.0 工具管理
# ============================================================
[tools]
# 📋 工具清单(只读)- 启动后自动生成
tool_list = "(启动后自动生成)"
# 🚫 禁用工具列表 - 要禁用的工具名(每行一个)
# 从上方工具清单复制工具名,禁用后该工具不会被 LLM 调用
# 示例:
# disabled_tools = '''
# mcp_filesystem_delete_file
# mcp_filesystem_write_file
# '''
disabled_tools = ""
# ============================================================
# 🆕 v1.4.0 权限控制
# ============================================================
[permissions]
# 🔐 启用权限控制 - 按群/用户限制工具使用
perm_enabled = false
# 📋 默认模式
# allow_all: 未配置规则的工具默认允许
# deny_all: 未配置规则的工具默认禁止
perm_default_mode = "allow_all"
# ────────────────────────────────────────────────────────────
# 🚀 快捷配置(推荐新手使用)
# ────────────────────────────────────────────────────────────
# 🚫 禁用群列表 - 这些群无法使用任何 MCP 工具(每行一个群号)
# 示例:
# quick_deny_groups = '''
# 123456789
# 987654321
# '''
quick_deny_groups = ""
# ✅ 管理员白名单 - 这些用户始终可以使用所有工具每行一个QQ号
# 示例:
# quick_allow_users = '''
# 111111111
# '''
quick_allow_users = ""
# ────────────────────────────────────────────────────────────
# 📜 高级权限规则(可选,针对特定工具配置)
# ────────────────────────────────────────────────────────────
# 格式: qq:ID:group/private/user工具名支持通配符 *
# 示例:
# perm_rules = '''
# [
# {"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}
# ]
# '''
perm_rules = "[]"
# ============================================================
# 🔌 MCP 服务器配置
# ============================================================
#
# ⚠️ 重要JSON 格式说明
# ────────────────────────────────────────────────────────────
# 服务器列表必须是 JSON 数组格式!
#
# ❌ 错误写法:
# { "name": "server1", ... },
# { "name": "server2", ... }
#
# ✅ 正确写法:
# [
# { "name": "server1", ... },
# { "name": "server2", ... }
# ]
#
# ────────────────────────────────────────────────────────────
# 每个服务器的配置字段:
# name - 服务器名称(唯一标识)
# enabled - 是否启用 (true/false)
# transport - 传输方式: "stdio" / "sse" / "http" / "streamable_http"
# url - 服务器地址sse/http/streamable_http 模式必填)
# headers - 🆕 鉴权头(可选,如 {"Authorization": "Bearer xxx"}
# command - 启动命令stdio 模式,如 "npx" 或 "uvx"
# args - 命令参数数组stdio 模式)
# env - 环境变量对象stdio 模式,可选)
# post_process - 服务器级别后处理配置(可选)
#
# ============================================================
[servers]
list = '''
[
{
"name": "time-mcp-server",
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time"
},
{
"name": "my-auth-server",
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.net/xxxxxx/mcp",
"headers": {
"Authorization": "Bearer ms-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
},
{
"name": "fetch-local",
"enabled": false,
"transport": "stdio",
"command": "uvx",
"args": ["mcp-server-fetch"]
}
]
'''
# ============================================================
# 状态显示(只读)
# ============================================================
[status]
connection_status = "未初始化"

View File

@ -0,0 +1,448 @@
"""
MCP 配置格式转换模块 v1.0.0
支持的格式:
- Claude Desktop (claude_desktop_config.json)
- Kiro MCP (mcp.json)
- MaiBot MCP Bridge Plugin (本插件格式)
转换规则:
- stdio: command + args + env
- sse/http/streamable_http: url + headers
"""
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class ConversionResult:
"""转换结果"""
success: bool
servers: List[Dict[str, Any]] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
skipped: List[str] = field(default_factory=list)
class ConfigConverter:
"""MCP 配置格式转换器"""
# transport 类型映射 (外部格式 -> 内部格式)
TRANSPORT_MAP_IN = {
"sse": "sse",
"http": "http",
"streamable-http": "streamable_http",
"streamable_http": "streamable_http",
"streamable-http": "streamable_http",
"stdio": "stdio",
}
# 支持的 transport 字段名(有些格式用 type 而不是 transport
TRANSPORT_FIELD_NAMES = ["transport", "type"]
# transport 类型映射 (内部格式 -> Claude 格式)
TRANSPORT_MAP_OUT = {
"sse": "sse",
"http": "http",
"streamable_http": "streamable-http",
"stdio": "stdio",
}
@classmethod
def detect_format(cls, config: Dict[str, Any]) -> Optional[str]:
"""检测配置格式类型
Returns:
"claude": Claude Desktop 格式 (mcpServers 对象)
"kiro": Kiro MCP 格式 (mcpServers 对象 Claude 相同)
"maibot": MaiBot 插件格式 (数组)
None: 无法识别
"""
if isinstance(config, list):
# 数组格式,检查是否是 MaiBot 格式
if len(config) == 0:
return "maibot"
if isinstance(config[0], dict) and "name" in config[0]:
return "maibot"
return None
if isinstance(config, dict):
# 对象格式
if "mcpServers" in config:
return "claude" # Claude 和 Kiro 格式相同
# 可能是单个服务器配置
if "name" in config:
return "maibot_single"
return None
return None
@classmethod
def parse_json_safe(cls, json_str: str) -> Tuple[Optional[Any], Optional[str]]:
"""安全解析 JSON 字符串
Returns:
(解析结果, 错误信息)
"""
if not json_str or not json_str.strip():
return None, "输入为空"
json_str = json_str.strip()
try:
return json.loads(json_str), None
except json.JSONDecodeError as e:
# 尝试提供更友好的错误信息
line = e.lineno
col = e.colno
return None, f"JSON 解析失败 (行 {line}, 列 {col}): {e.msg}"
@classmethod
def validate_server_config(cls, name: str, config: Dict[str, Any]) -> Tuple[bool, Optional[str], List[str]]:
"""验证单个服务器配置
Args:
name: 服务器名称
config: 服务器配置字典
Returns:
(是否有效, 错误信息, 警告列表)
"""
warnings = []
if not isinstance(config, dict):
return False, f"服务器 '{name}' 配置必须是对象", []
has_command = "command" in config
has_url = "url" in config
# 必须有 command 或 url 之一
if not has_command and not has_url:
return False, f"服务器 '{name}' 缺少 'command''url' 字段", []
# 同时有 command 和 url 时给出警告
if has_command and has_url:
warnings.append(f"'{name}': 同时存在 command 和 url将优先使用 stdio 模式")
# 验证 url 格式
if has_url and not has_command:
url = config.get("url", "")
if not isinstance(url, str):
return False, f"服务器 '{name}' 的 url 必须是字符串", []
if not url.startswith(("http://", "https://")):
warnings.append(f"'{name}': url 不是标准 HTTP(S) 地址")
# 验证 command 格式
if has_command:
command = config.get("command", "")
if not isinstance(command, str):
return False, f"服务器 '{name}' 的 command 必须是字符串", []
if not command.strip():
return False, f"服务器 '{name}' 的 command 不能为空", []
# 验证 args 格式
if "args" in config:
args = config.get("args")
if not isinstance(args, list):
return False, f"服务器 '{name}' 的 args 必须是数组", []
for i, arg in enumerate(args):
if not isinstance(arg, str):
warnings.append(f"'{name}': args[{i}] 不是字符串,将自动转换")
# 验证 env 格式
if "env" in config:
env = config.get("env")
if not isinstance(env, dict):
return False, f"服务器 '{name}' 的 env 必须是对象", []
# 验证 headers 格式
if "headers" in config:
headers = config.get("headers")
if not isinstance(headers, dict):
return False, f"服务器 '{name}' 的 headers 必须是对象", []
# 验证 transport/type 格式
transport_value = None
for field_name in cls.TRANSPORT_FIELD_NAMES:
if field_name in config:
transport_value = config.get(field_name, "").lower()
break
if transport_value and transport_value not in cls.TRANSPORT_MAP_IN:
warnings.append(f"'{name}': 未知的 transport 类型 '{transport_value}',将自动推断")
return True, None, warnings
@classmethod
def convert_claude_server(cls, name: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""将单个 Claude 格式服务器配置转换为 MaiBot 格式
Args:
name: 服务器名称
config: Claude 格式的服务器配置
Returns:
MaiBot 格式的服务器配置
"""
result = {
"name": name,
"enabled": True,
}
has_command = "command" in config
if has_command:
# stdio 模式
result["transport"] = "stdio"
result["command"] = config.get("command", "")
# 处理 args
args = config.get("args", [])
if args:
# 确保所有 args 都是字符串
result["args"] = [str(arg) for arg in args]
# 处理 env
env = config.get("env", {})
if env and isinstance(env, dict):
result["env"] = env
else:
# 远程模式 (sse/http/streamable_http)
# 支持 transport 或 type 字段
transport_raw = None
for field_name in cls.TRANSPORT_FIELD_NAMES:
if field_name in config:
transport_raw = config.get(field_name, "").lower()
break
if not transport_raw:
transport_raw = "sse"
result["transport"] = cls.TRANSPORT_MAP_IN.get(transport_raw, "sse")
result["url"] = config.get("url", "")
# 处理 headers
headers = config.get("headers", {})
if headers and isinstance(headers, dict):
result["headers"] = headers
return result
@classmethod
def convert_maibot_server(cls, config: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
"""将单个 MaiBot 格式服务器配置转换为 Claude 格式
Args:
config: MaiBot 格式的服务器配置
Returns:
(服务器名称, Claude 格式的服务器配置)
"""
name = config.get("name", "unnamed")
result = {}
transport = config.get("transport", "stdio").lower()
if transport == "stdio":
# stdio 模式
result["command"] = config.get("command", "")
args = config.get("args", [])
if args:
result["args"] = args
env = config.get("env", {})
if env:
result["env"] = env
else:
# 远程模式
result["url"] = config.get("url", "")
# 转换 transport 名称
claude_transport = cls.TRANSPORT_MAP_OUT.get(transport, "sse")
if claude_transport != "sse": # sse 是默认值,可以省略
result["transport"] = claude_transport
headers = config.get("headers", {})
if headers:
result["headers"] = headers
return name, result
@classmethod
def from_claude_format(
cls,
config: Dict[str, Any],
existing_names: Optional[set] = None
) -> ConversionResult:
"""从 Claude Desktop 格式转换为 MaiBot 格式
Args:
config: Claude Desktop 配置 (包含 mcpServers 字段)
existing_names: 已存在的服务器名称集合用于跳过重复
Returns:
ConversionResult
"""
result = ConversionResult(success=True)
existing_names = existing_names or set()
# 检查格式
if not isinstance(config, dict):
result.success = False
result.errors.append("配置必须是 JSON 对象")
return result
mcp_servers = config.get("mcpServers", {})
if not isinstance(mcp_servers, dict):
result.success = False
result.errors.append("mcpServers 必须是对象")
return result
if not mcp_servers:
result.warnings.append("mcpServers 为空,没有服务器可导入")
return result
# 转换每个服务器
for name, srv_config in mcp_servers.items():
# 检查名称是否已存在
if name in existing_names:
result.skipped.append(f"'{name}' (已存在)")
continue
# 验证配置
valid, error, warnings = cls.validate_server_config(name, srv_config)
result.warnings.extend(warnings)
if not valid:
result.errors.append(error)
continue
# 转换配置
try:
converted = cls.convert_claude_server(name, srv_config)
result.servers.append(converted)
except Exception as e:
result.errors.append(f"转换服务器 '{name}' 失败: {str(e)}")
# 如果有错误但也有成功的,仍然标记为成功(部分成功)
if result.errors and not result.servers:
result.success = False
return result
@classmethod
def to_claude_format(cls, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
"""将 MaiBot 格式转换为 Claude Desktop 格式
Args:
servers: MaiBot 格式的服务器列表
Returns:
Claude Desktop 格式的配置
"""
mcp_servers = {}
for srv in servers:
if not isinstance(srv, dict):
continue
name, config = cls.convert_maibot_server(srv)
mcp_servers[name] = config
return {"mcpServers": mcp_servers}
@classmethod
def import_from_string(
cls,
json_str: str,
existing_names: Optional[set] = None
) -> ConversionResult:
"""从 JSON 字符串导入配置
自动检测格式并转换为 MaiBot 格式
Args:
json_str: JSON 字符串
existing_names: 已存在的服务器名称集合
Returns:
ConversionResult
"""
result = ConversionResult(success=True)
existing_names = existing_names or set()
# 解析 JSON
parsed, error = cls.parse_json_safe(json_str)
if error:
result.success = False
result.errors.append(error)
return result
# 检测格式
fmt = cls.detect_format(parsed)
if fmt is None:
result.success = False
result.errors.append("无法识别的配置格式")
return result
if fmt == "maibot":
# 已经是 MaiBot 格式,直接验证并返回
for srv in parsed:
if not isinstance(srv, dict):
result.warnings.append("跳过非对象元素")
continue
name = srv.get("name", "")
if not name:
result.warnings.append("跳过缺少 name 的服务器")
continue
if name in existing_names:
result.skipped.append(f"'{name}' (已存在)")
continue
result.servers.append(srv)
elif fmt == "maibot_single":
# 单个 MaiBot 格式服务器
name = parsed.get("name", "")
if name in existing_names:
result.skipped.append(f"'{name}' (已存在)")
else:
result.servers.append(parsed)
elif fmt in ("claude", "kiro"):
# Claude/Kiro 格式
return cls.from_claude_format(parsed, existing_names)
return result
@classmethod
def export_to_string(
cls,
servers: List[Dict[str, Any]],
format_type: str = "claude",
pretty: bool = True
) -> str:
"""导出配置为 JSON 字符串
Args:
servers: MaiBot 格式的服务器列表
format_type: 导出格式 ("claude", "kiro", "maibot")
pretty: 是否格式化输出
Returns:
JSON 字符串
"""
indent = 2 if pretty else None
if format_type in ("claude", "kiro"):
config = cls.to_claude_format(servers)
else:
config = servers
return json.dumps(config, ensure_ascii=False, indent=indent)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
# MCP 桥接插件依赖
mcp>=1.0.0

View File

@ -0,0 +1,270 @@
#!/usr/bin/env python3
"""
MCP 客户端测试脚本
测试 mcp_client.py 的基本功能
"""
import asyncio
import sys
import os
# 确保当前目录在 path 中
sys.path.insert(0, os.path.dirname(__file__))
from mcp_client import (
MCPClientManager,
MCPServerConfig,
TransportType,
ToolCallStats,
ServerStats,
)
async def test_stats():
"""测试统计类"""
print("\n=== 测试统计类 ===")
# 测试 ToolCallStats
stats = ToolCallStats(tool_key="test_tool")
stats.record_call(True, 100.0)
stats.record_call(True, 200.0)
stats.record_call(False, 50.0, "timeout")
assert stats.total_calls == 3
assert stats.success_calls == 2
assert stats.failed_calls == 1
assert stats.success_rate == (2/3) * 100
assert stats.avg_duration_ms == 150.0
assert stats.last_error == "timeout"
print(f"✅ ToolCallStats: {stats.to_dict()}")
# 测试 ServerStats
server_stats = ServerStats(server_name="test_server")
server_stats.record_connect()
server_stats.record_heartbeat()
server_stats.record_disconnect()
server_stats.record_failure()
server_stats.record_failure()
assert server_stats.connect_count == 1
assert server_stats.disconnect_count == 1
assert server_stats.consecutive_failures == 2
print(f"✅ ServerStats: {server_stats.to_dict()}")
return True
async def test_manager_basic():
"""测试管理器基本功能"""
print("\n=== 测试管理器基本功能 ===")
# 创建新的管理器实例(绕过单例)
manager = MCPClientManager.__new__(MCPClientManager)
manager._initialized = False
manager.__init__()
# 配置
manager.configure({
"tool_prefix": "mcp",
"call_timeout": 30.0,
"retry_attempts": 1,
"retry_interval": 1.0,
"heartbeat_enabled": False,
})
# 测试状态
status = manager.get_status()
assert status["total_servers"] == 0
assert status["connected_servers"] == 0
print(f"✅ 初始状态: {status}")
# 测试添加禁用的服务器
config = MCPServerConfig(
name="disabled_server",
enabled=False,
transport=TransportType.HTTP,
url="https://example.com/mcp"
)
result = await manager.add_server(config)
assert result == True
assert "disabled_server" in manager._clients
assert manager._clients["disabled_server"].is_connected == False
print("✅ 添加禁用服务器成功")
# 测试重复添加
result = await manager.add_server(config)
assert result == False
print("✅ 重复添加被拒绝")
# 测试移除
result = await manager.remove_server("disabled_server")
assert result == True
assert "disabled_server" not in manager._clients
print("✅ 移除服务器成功")
# 清理
await manager.shutdown()
print("✅ 管理器关闭成功")
return True
async def test_http_connection():
"""测试 HTTP 连接(使用真实的 MCP 服务器)"""
print("\n=== 测试 HTTP 连接 ===")
# 创建新的管理器实例
manager = MCPClientManager.__new__(MCPClientManager)
manager._initialized = False
manager.__init__()
manager.configure({
"tool_prefix": "mcp",
"call_timeout": 30.0,
"retry_attempts": 2,
"retry_interval": 2.0,
"heartbeat_enabled": False,
})
# 使用 HowToCook MCP 服务器测试
config = MCPServerConfig(
name="howtocook",
enabled=True,
transport=TransportType.HTTP,
url="https://mcp.api-inference.modelscope.net/c9b55951d4ed47/mcp"
)
print(f"正在连接 {config.url} ...")
result = await manager.add_server(config)
if result:
print(f"✅ 连接成功!")
# 检查工具
tools = manager.all_tools
print(f"✅ 发现 {len(tools)} 个工具:")
for tool_key in tools:
print(f" - {tool_key}")
# 测试心跳
client = manager._clients["howtocook"]
healthy = await client.check_health()
print(f"✅ 心跳检测: {'健康' if healthy else '异常'}")
# 测试工具调用
if "mcp_howtocook_whatToEat" in tools:
print("\n正在调用 whatToEat 工具...")
call_result = await manager.call_tool("mcp_howtocook_whatToEat", {})
if call_result.success:
print(f"✅ 工具调用成功 (耗时: {call_result.duration_ms:.0f}ms)")
print(f" 结果: {call_result.content[:200]}..." if len(str(call_result.content)) > 200 else f" 结果: {call_result.content}")
else:
print(f"❌ 工具调用失败: {call_result.error}")
# 查看统计
stats = manager.get_all_stats()
print(f"\n📊 统计信息:")
print(f" 全局调用: {stats['global']['total_tool_calls']}")
print(f" 成功: {stats['global']['successful_calls']}")
print(f" 失败: {stats['global']['failed_calls']}")
else:
print(f"❌ 连接失败")
# 清理
await manager.shutdown()
return result
async def test_heartbeat():
"""测试心跳检测功能"""
print("\n=== 测试心跳检测 ===")
# 创建新的管理器实例
manager = MCPClientManager.__new__(MCPClientManager)
manager._initialized = False
manager.__init__()
manager.configure({
"tool_prefix": "mcp",
"call_timeout": 30.0,
"retry_attempts": 1,
"retry_interval": 1.0,
"heartbeat_enabled": True,
"heartbeat_interval": 5.0, # 5秒间隔用于测试
"auto_reconnect": True,
"max_reconnect_attempts": 2,
})
# 添加一个测试服务器
config = MCPServerConfig(
name="heartbeat_test",
enabled=True,
transport=TransportType.HTTP,
url="https://mcp.api-inference.modelscope.net/c9b55951d4ed47/mcp"
)
print("正在连接服务器...")
result = await manager.add_server(config)
if result:
print("✅ 服务器连接成功")
# 启动心跳检测
await manager.start_heartbeat()
print("✅ 心跳检测已启动")
# 等待一个心跳周期
print("等待心跳检测...")
await asyncio.sleep(2)
# 检查状态
status = manager.get_status()
print(f"✅ 心跳运行状态: {status['heartbeat_running']}")
# 停止心跳
await manager.stop_heartbeat()
print("✅ 心跳检测已停止")
else:
print("❌ 服务器连接失败,跳过心跳测试")
await manager.shutdown()
return True
async def main():
"""运行所有测试"""
print("=" * 50)
print("MCP 客户端测试")
print("=" * 50)
try:
# 基础测试
await test_stats()
await test_manager_basic()
# 网络测试
print("\n是否进行网络连接测试? (需要网络) [y/N]: ", end="")
# 自动进行网络测试
await test_http_connection()
# 心跳测试
await test_heartbeat()
print("\n" + "=" * 50)
print("✅ 所有测试通过!")
print("=" * 50)
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,30 +0,0 @@
from src.chat.knowledge.kg_manager import KGManager
kg = KGManager()
kg.load_from_file()
edges = kg.graph.get_edge_list()
if edges:
e = edges[0]
print(f"Edge tuple: {e}")
print(f"Edge tuple type: {type(e)}")
edge_data = kg.graph[e[0], e[1]]
print(f"\nEdge data type: {type(edge_data)}")
print(f"Edge data: {edge_data}")
print(f"Has 'get' method: {hasattr(edge_data, 'get')}")
print(f"Is dict: {isinstance(edge_data, dict)}")
# 尝试不同的访问方式
try:
print(f"\nUsing []: {edge_data['weight']}")
except Exception as e:
print(f"Using [] failed: {e}")
try:
print(f"Using .get(): {edge_data.get('weight')}")
except Exception as e:
print(f"Using .get() failed: {e}")
# 查看所有属性
print(f"\nDir: {[x for x in dir(edge_data) if not x.startswith('_')]}")