feat: 增加WebUI访问令牌验证功能,支持IP锁定机制

pull/75/head
Alnnt 2025-12-31 23:28:17 +08:00
parent 9914bac5a6
commit db6f72c6b6
7 changed files with 387 additions and 21 deletions

View File

@ -15,6 +15,6 @@ WORKDIR /adapters
COPY . .
EXPOSE 8095
EXPOSE 8095 8096
ENTRYPOINT ["python", "main.py"]

View File

@ -18,6 +18,7 @@ from src.config.official_configs import (
NapcatServerConfig,
NicknameConfig,
VoiceConfig,
WebUIConfig,
)
install(extra_lines=3)
@ -118,6 +119,7 @@ class Config(ConfigBase):
chat: ChatConfig
voice: VoiceConfig
debug: DebugConfig
web_ui: WebUIConfig = None # 可选配置,向后兼容
def load_config(config_path: str) -> Config:

View File

@ -59,7 +59,7 @@ class ChatConfig(ConfigBase):
"""私聊列表类型 白名单/黑名单"""
private_list: list[int] = field(default_factory=[])
"""私聊列表"""
"""e"""
ban_user_id: list[int] = field(default_factory=[])
"""被封禁的用户ID列表封禁后将无法与其进行交互"""
@ -81,3 +81,18 @@ class VoiceConfig(ConfigBase):
class DebugConfig(ConfigBase):
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
"""日志级别默认为INFO"""
@dataclass
class WebUIConfig(ConfigBase):
enable: bool = True
"""是否启用 WebUI"""
token: str = ""
"""WebUI访问令牌若为空则无需验证"""
host: str = "0.0.0.0"
"""WebUI监听地址"""
port: int = 8096
"""WebUI端口"""

View File

@ -12,24 +12,41 @@ from .routes import setup_routes
_runner: Optional[web.AppRunner] = None
_site: Optional[web.TCPSite] = None
# WebUI 配置
WEBUI_HOST = "0.0.0.0"
WEBUI_PORT = 8096
def get_webui_config():
"""获取 WebUI 配置"""
if global_config.web_ui:
return global_config.web_ui.host, global_config.web_ui.port
return "0.0.0.0", 8096 # 默认值
def is_webui_enabled() -> bool:
"""检查 WebUI 是否启用"""
if global_config.web_ui:
return global_config.web_ui.enable
return True # 默认启用
async def start_webui():
"""启动 WebUI 服务"""
global _runner, _site
# 检查是否启用 WebUI
if not is_webui_enabled():
logger.info("WebUI 已禁用,跳过启动")
return
host, port = get_webui_config()
app = web.Application()
setup_routes(app)
_runner = web.AppRunner(app)
await _runner.setup()
_site = web.TCPSite(_runner, WEBUI_HOST, WEBUI_PORT)
_site = web.TCPSite(_runner, host, port)
await _site.start()
logger.info(f"WebUI 已启动,访问地址: http://{WEBUI_HOST}:{WEBUI_PORT}")
logger.info(f"WebUI 已启动,访问地址: http://{host}:{port}")
async def stop_webui():

View File

@ -3,6 +3,9 @@ WebUI 路由定义
"""
import json
import hmac
import time
from collections import defaultdict
from aiohttp import web
from src.config import global_config
from src.logger import logger
@ -16,20 +19,146 @@ from .config_manager import (
)
from .static import get_index_html
# 登录失败记录IP -> (失败次数, 最后失败时间)
_login_failures: dict[str, tuple[int, float]] = defaultdict(lambda: (0, 0.0))
# 最大允许失败次数
MAX_LOGIN_FAILURES = 5
# 锁定时间(秒)
LOCKOUT_TIME = 300 # 5分钟
def get_client_ip(request: web.Request) -> str:
"""获取客户端 IP 地址"""
# 优先从 X-Forwarded-For 获取(如果在反向代理后面)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
# 否则从直接连接获取
peername = request.transport.get_extra_info("peername")
if peername:
return peername[0]
return "unknown"
def is_ip_locked(ip: str) -> bool:
"""检查 IP 是否被锁定"""
failures, last_failure_time = _login_failures[ip]
if failures >= MAX_LOGIN_FAILURES:
# 检查是否还在锁定期内
if time.time() - last_failure_time < LOCKOUT_TIME:
return True
else:
# 锁定期已过,重置计数
_login_failures[ip] = (0, 0.0)
return False
def record_login_failure(ip: str):
"""记录登录失败"""
failures, _ = _login_failures[ip]
_login_failures[ip] = (failures + 1, time.time())
def reset_login_failures(ip: str):
"""重置登录失败计数"""
_login_failures[ip] = (0, 0.0)
def get_configured_token() -> str:
"""获取配置的 token"""
if global_config.web_ui:
return global_config.web_ui.token or ""
return ""
def secure_compare(a: str, b: str) -> bool:
"""安全的字符串比较,防止时序攻击"""
return hmac.compare_digest(a.encode('utf-8'), b.encode('utf-8'))
def verify_token(request: web.Request) -> bool:
"""验证请求中的 token"""
configured_token = get_configured_token()
if not configured_token:
return True # 未配置 token无需验证
# 从 Authorization header 获取 token
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
request_token = auth_header[7:]
return secure_compare(request_token, configured_token)
return False
async def index_handler(request: web.Request) -> web.Response:
"""返回主页面"""
return web.Response(text=get_index_html(), content_type="text/html")
async def verify_token_handler(request: web.Request) -> web.Response:
"""验证 token 接口"""
configured_token = get_configured_token()
client_ip = get_client_ip(request)
# 如果未配置 token直接返回成功
if not configured_token:
return web.json_response({"success": True, "message": "无需验证", "required": False})
# 检查 IP 是否被锁定
if is_ip_locked(client_ip):
remaining_time = int(LOCKOUT_TIME - (time.time() - _login_failures[client_ip][1]))
logger.warning(f"[WebUI] IP {client_ip} 登录尝试被拒绝(已锁定,剩余 {remaining_time} 秒)")
return web.json_response({
"success": False,
"message": f"登录尝试次数过多,请在 {remaining_time} 秒后重试",
"required": True
}, status=429)
try:
data = await request.json()
input_token = data.get("token", "")
if secure_compare(input_token, configured_token):
reset_login_failures(client_ip)
logger.info(f"[WebUI] Token 验证成功 (IP: {client_ip})")
return web.json_response({"success": True, "message": "验证成功", "required": True})
else:
record_login_failure(client_ip)
failures, _ = _login_failures[client_ip]
remaining_attempts = MAX_LOGIN_FAILURES - failures
logger.warning(f"[WebUI] Token 验证失败 (IP: {client_ip}, 剩余尝试次数: {remaining_attempts})")
return web.json_response({"success": False, "message": "Token 错误", "required": True}, status=401)
except json.JSONDecodeError:
return web.json_response({"success": False, "message": "无效的请求数据"}, status=400)
async def check_auth_handler(request: web.Request) -> web.Response:
"""检查是否需要验证以及当前 token 是否有效"""
configured_token = get_configured_token()
if not configured_token:
return web.json_response({"required": False, "valid": True})
if verify_token(request):
return web.json_response({"required": True, "valid": True})
else:
return web.json_response({"required": True, "valid": False})
async def get_config_handler(request: web.Request) -> web.Response:
"""获取当前配置"""
if not verify_token(request):
return web.json_response({"success": False, "error": "未授权"}, status=401)
config = get_chat_config()
return web.json_response(config)
async def update_config_handler(request: web.Request) -> web.Response:
"""更新配置"""
if not verify_token(request):
return web.json_response({"success": False, "error": "未授权"}, status=401)
try:
data = await request.json()
@ -97,6 +226,7 @@ async def update_config_handler(request: web.Request) -> web.Response:
def setup_routes(app: web.Application):
"""设置路由"""
app.router.add_get("/", index_handler)
app.router.add_get("/api/auth/check", check_auth_handler)
app.router.add_post("/api/auth/verify", verify_token_handler)
app.router.add_get("/api/config", get_config_handler)
app.router.add_post("/api/config", update_config_handler)

View File

@ -77,6 +77,22 @@ def get_index_html() -> str:
border-color: #667eea;
}
.form-group input[type="password"],
.form-group input[type="text"] {
width: 100%;
padding: 12px;
border: 2px solid #e0e0e0;
border-radius: 8px;
font-size: 16px;
transition: border-color 0.3s;
}
.form-group input[type="password"]:focus,
.form-group input[type="text"]:focus {
outline: none;
border-color: #667eea;
}
.list-container {
background: #f8f9fa;
border-radius: 8px;
@ -154,6 +170,28 @@ def get_index_html() -> str:
background: #5a6fd6;
}
.login-btn {
width: 100%;
padding: 14px 20px;
background: #667eea;
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
font-weight: 600;
font-size: 16px;
transition: background 0.3s;
}
.login-btn:hover {
background: #5a6fd6;
}
.login-btn:disabled {
background: #ccc;
cursor: not-allowed;
}
.status {
position: fixed;
top: 20px;
@ -192,10 +230,46 @@ def get_index_html() -> str:
font-style: italic;
padding: 10px 0;
}
.hidden {
display: none !important;
}
.login-container {
max-width: 400px;
margin: 100px auto;
}
.login-container .card {
text-align: center;
}
.login-container h2 {
border-bottom: none !important;
}
.login-icon {
font-size: 48px;
margin-bottom: 10px;
}
</style>
</head>
<body>
<div class="container">
<!-- 登录界面 -->
<div id="login-page" class="login-container hidden">
<h1>🤖 MaiBot Adapter</h1>
<div class="card">
<div class="login-icon">🔐</div>
<h2>请输入访问令牌</h2>
<div class="form-group">
<input type="password" id="token-input" placeholder="输入 Token" onkeypress="if(event.key==='Enter') login()" />
</div>
<button class="login-btn" onclick="login()">登录</button>
</div>
</div>
<!-- 主界面 -->
<div id="main-page" class="container hidden">
<h1>🤖 MaiBot Adapter 配置管理</h1>
<div class="card">
@ -255,11 +329,125 @@ def get_index_html() -> str:
private_list: []
};
// 获取存储的 token
function getStoredToken() {
return localStorage.getItem('webui_token') || '';
}
// 存储 token
function storeToken(token) {
localStorage.setItem('webui_token', token);
}
// 清除 token
function clearToken() {
localStorage.removeItem('webui_token');
}
// 获取带认证的 headers
function getAuthHeaders() {
const token = getStoredToken();
const headers = { 'Content-Type': 'application/json' };
if (token) {
headers['Authorization'] = 'Bearer ' + token;
}
return headers;
}
// 显示登录页面
function showLoginPage() {
document.getElementById('login-page').classList.remove('hidden');
document.getElementById('main-page').classList.add('hidden');
}
// 显示主页面
function showMainPage() {
document.getElementById('login-page').classList.add('hidden');
document.getElementById('main-page').classList.remove('hidden');
}
// 检查认证状态
async function checkAuth() {
try {
const response = await fetch('/api/auth/check', {
headers: getAuthHeaders()
});
const data = await response.json();
if (!data.required) {
// 不需要认证
showMainPage();
await loadConfig();
} else if (data.valid) {
// 需要认证且当前 token 有效
showMainPage();
await loadConfig();
} else {
// 需要认证但 token 无效
clearToken();
showLoginPage();
}
} catch (error) {
showStatus('检查认证状态失败: ' + error.message, 'error');
showLoginPage();
}
}
// 登录
async function login() {
const tokenInput = document.getElementById('token-input');
const token = tokenInput.value.trim();
if (!token) {
showStatus('请输入 Token', 'error');
return;
}
try {
const response = await fetch('/api/auth/verify', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ token })
});
const data = await response.json();
if (data.success) {
storeToken(token);
showStatus('登录成功', 'success');
tokenInput.value = '';
showMainPage();
await loadConfig();
} else {
showStatus(data.message || 'Token 错误', 'error');
}
} catch (error) {
showStatus('登录失败: ' + error.message, 'error');
}
}
// 加载配置
async function loadConfig() {
try {
const response = await fetch('/api/config');
config = await response.json();
const response = await fetch('/api/config', {
headers: getAuthHeaders()
});
if (response.status === 401) {
// 未授权跳转到登录页
clearToken();
showLoginPage();
showStatus('登录已过期,请重新登录', 'error');
return;
}
const data = await response.json();
if (data.success === false) {
showStatus('加载配置失败: ' + (data.error || '未知错误'), 'error');
return;
}
config = data;
renderConfig();
} catch (error) {
showStatus('加载配置失败: ' + error.message, 'error');
@ -269,18 +457,18 @@ def get_index_html() -> str:
// 渲染配置
function renderConfig() {
// 设置选择框
document.getElementById('group_list_type').value = config.group_list_type;
document.getElementById('private_list_type').value = config.private_list_type;
document.getElementById('group_list_type').value = config.group_list_type || 'whitelist';
document.getElementById('private_list_type').value = config.private_list_type || 'whitelist';
// 渲染列表
renderList('group_list', config.group_list);
renderList('private_list', config.private_list);
renderList('group_list', config.group_list || []);
renderList('private_list', config.private_list || []);
}
// 渲染列表
function renderList(listId, items) {
const container = document.getElementById(listId);
if (items.length === 0) {
if (!items || items.length === 0) {
container.innerHTML = '<div class="empty-list">列表为空</div>';
} else {
container.innerHTML = items.map(item => `
@ -297,9 +485,17 @@ def get_index_html() -> str:
try {
const response = await fetch('/api/config', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: getAuthHeaders(),
body: JSON.stringify({ field, value })
});
if (response.status === 401) {
clearToken();
showLoginPage();
showStatus('登录已过期,请重新登录', 'error');
return;
}
const result = await response.json();
if (result.success) {
config = result.config;
@ -324,7 +520,7 @@ def get_index_html() -> str:
return;
}
const list = [...config[listId]];
const list = [...(config[listId] || [])];
if (list.includes(value)) {
showStatus('该项已存在', 'error');
return;
@ -337,7 +533,7 @@ def get_index_html() -> str:
// 删除项目
function removeItem(listId, value) {
const list = config[listId].filter(item => item !== value);
const list = (config[listId] || []).filter(item => item !== value);
updateConfig(listId, list);
}
@ -351,8 +547,8 @@ def get_index_html() -> str:
}, 3000);
}
// 页面加载时获取配置
loadConfig();
// 页面加载时检查认证状态
checkAuth();
</script>
</body>
</html>'''

View File

@ -33,3 +33,9 @@ use_tts = false # 是否使用tts语音请确保你配置了tts并有对应
[debug]
level = "INFO" # 日志等级DEBUG, INFO, WARNING, ERROR, CRITICAL
[web_ui] # WebUI 配置管理界面
enable = true # 是否启用 WebUI
token = "" # 访问令牌,若为空则无需验证
host = "localhost" # WebUI监听地址
port = 8096 # WebUI端口