From 32af5ae29a4ffbd8b6165c544a0573a3f5af3916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:21:02 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat:=E4=B8=BA=20WebUI=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=98=B2=E7=88=AC=E8=99=AB=E4=B8=8E=E8=B5=84=E4=BA=A7=E6=B5=8B?= =?UTF-8?q?=E7=BB=98=E6=A3=80=E6=B5=8B=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=A7=8D=E9=98=B2=E6=8A=A4=E6=A8=A1=E5=BC=8F=E4=B8=8E=20IP=20?= =?UTF-8?q?=E7=99=BD=E5=90=8D=E5=8D=95=EF=BC=8C=E8=B5=84=E6=BA=90=E5=8D=A0?= =?UTF-8?q?=E7=94=A8=E4=BD=8E=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增功能: 防爬虫检测:检测 20+ 种爬虫和 30+ 种资产测绘工具(Shodan、Censys、Nmap 等) 多种模式:basic(只记录,默认)、strict(严格)、loose(宽松)、false(禁用) IP 白名单:支持精确 IP、CIDR、通配符格式,白名单 IP 豁免所有检测 请求频率限制:基于 IP 的滑动时间窗口,防止高频请求 --- src/webui/anti_crawler.py | 701 ++++++++++++++++++++++++++++++++++++++ src/webui/webui_server.py | 46 +++ template/template.env | 8 +- 3 files changed, 754 insertions(+), 1 deletion(-) create mode 100644 src/webui/anti_crawler.py diff --git a/src/webui/anti_crawler.py b/src/webui/anti_crawler.py new file mode 100644 index 00000000..c8c3c318 --- /dev/null +++ b/src/webui/anti_crawler.py @@ -0,0 +1,701 @@ +""" +WebUI 防爬虫模块 +提供爬虫检测和阻止功能,保护 WebUI 不被搜索引擎和恶意爬虫访问 +""" + +import os +import time +import ipaddress +import re +from collections import defaultdict +from typing import Optional, Union +from functools import lru_cache +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response, PlainTextResponse +from fastapi import HTTPException + +from src.common.logger import get_logger + +logger = get_logger("webui.anti_crawler") + +# 常见爬虫 User-Agent 列表(使用更精确的关键词,避免误报) +CRAWLER_USER_AGENTS = { + # 搜索引擎爬虫(精确匹配) + "googlebot", + "bingbot", + "baiduspider", + "yandexbot", + "slurp", # Yahoo + "duckduckbot", + "sogou", + "exabot", + "facebot", + "ia_archiver", # Internet Archive + # 通用爬虫(移除过于宽泛的关键词) + "crawler", + "spider", + "scraper", + "wget", # 保留wget,因为通常用于自动化脚本 + "scrapy", # 保留scrapy,因为这是爬虫框架 + # 安全扫描工具(这些是明确的扫描工具) + "masscan", + "nmap", + "nikto", + "sqlmap", + # 注意:移除了以下过于宽泛的关键词以避免误报: + # - "bot" (会误匹配GitHub-Robot等) + # - "curl" (正常工具) + # - "python-requests" (正常库) + # - "httpx" (正常库) + # - "aiohttp" (正常库) +} + +# 资产测绘工具 User-Agent 标识 +ASSET_SCANNER_USER_AGENTS = { + # 知名资产测绘平台 + "shodan", + "censys", + "zoomeye", + "fofa", + "quake", + "hunter", + "binaryedge", + "onyphe", + "securitytrails", + "virustotal", + "passivetotal", + # 安全扫描工具 + "acunetix", + "appscan", + "burpsuite", + "nessus", + "openvas", + "qualys", + "rapid7", + "tenable", + "veracode", + "zap", + "awvs", # Acunetix Web Vulnerability Scanner + "netsparker", + "skipfish", + "w3af", + "arachni", + # 其他扫描工具 + "masscan", + "zmap", + "nmap", + "whatweb", + "wpscan", + "joomscan", + "dnsenum", + "subfinder", + "amass", + "sublist3r", + "theharvester", +} + +# 资产测绘工具常用的HTTP头标识 +ASSET_SCANNER_HEADERS = { + # 常见的扫描工具自定义头 + "x-scan": {"shodan", "censys", "zoomeye", "fofa"}, + "x-scanner": {"nmap", "masscan", "zmap"}, + "x-probe": {"masscan", "zmap"}, + # 其他可疑头(移除反向代理标准头) + "x-originating-ip": set(), + "x-remote-ip": set(), + "x-remote-addr": set(), + # 注意:移除了以下反向代理标准头以避免误报: + # - "x-forwarded-proto" (反向代理标准头) + # - "x-real-ip" (反向代理标准头,已在_get_client_ip中使用) +} + +# 可疑的HTTP头值模式(用于检测扫描工具) +SUSPICIOUS_HEADER_PATTERNS = { + "shodan", + "censys", + "zoomeye", + "fofa", + "quake", + "scanner", + "probe", + "scan", + "recon", + "reconnaissance", +} + +# 防爬虫模式配置 +# false: 禁用 +# strict: 严格模式(更严格的检测,更低的频率限制) +# loose: 宽松模式(较宽松的检测,较高的频率限制) +# basic: 基础模式(只记录恶意访问,不阻止,不限制请求数,不跟踪IP) +ANTI_CRAWLER_MODE = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "basic").lower() + +# IP白名单配置(从环境变量读取,逗号分隔) +# 支持格式: +# - 精确IP:127.0.0.1, 192.168.1.100 +# - CIDR格式:192.168.1.0/24, 172.17.0.0/16 (适用于Docker网络) +# - 通配符:192.168.*.*, 10.*.*.*, *.*.*.* (匹配所有) +# - IPv6:::1, 2001:db8::/32 +def _parse_allowed_ips(ip_string: str) -> list: + """ + 解析IP白名单字符串,支持精确IP、CIDR格式和通配符 + + Args: + ip_string: 逗号分隔的IP字符串 + + Returns: + IP白名单列表,每个元素可能是: + - ipaddress.IPv4Network/IPv6Network对象(CIDR格式) + - ipaddress.IPv4Address/IPv6Address对象(精确IP) + - str(通配符模式,已转换为正则表达式) + """ + allowed = [] + if not ip_string: + return allowed + + for ip_entry in ip_string.split(","): + ip_entry = ip_entry.strip() # 去除空格 + if not ip_entry: + continue + + # 检查通配符格式(包含*) + if "*" in ip_entry: + # 处理通配符 + pattern = _convert_wildcard_to_regex(ip_entry) + if pattern: + allowed.append(pattern) + else: + logger.warning(f"无效的通配符IP格式,已忽略: {ip_entry}") + continue + + try: + # 尝试解析为CIDR格式(包含/) + if "/" in ip_entry: + allowed.append(ipaddress.ip_network(ip_entry, strict=False)) + else: + # 精确IP地址 + allowed.append(ipaddress.ip_address(ip_entry)) + except (ValueError, AttributeError) as e: + logger.warning(f"无效的IP白名单条目,已忽略: {ip_entry} ({e})") + + return allowed + + +def _convert_wildcard_to_regex(wildcard_pattern: str) -> Optional[str]: + """ + 将通配符IP模式转换为正则表达式 + + 支持的格式: + - 192.168.*.* 或 192.168.* + - 10.*.*.* 或 10.* + - *.*.*.* 或 * + + Args: + wildcard_pattern: 通配符模式字符串 + + Returns: + 正则表达式字符串,如果格式无效则返回None + """ + # 去除空格 + pattern = wildcard_pattern.strip() + + # 处理单个*(匹配所有) + if pattern == "*": + return r".*" + + # 处理IPv4通配符格式 + # 支持:192.168.*.*, 192.168.*, 10.*.*.*, 10.* 等 + parts = pattern.split(".") + + if len(parts) > 4: + return None # IPv4最多4段 + + # 构建正则表达式 + regex_parts = [] + for part in parts: + part = part.strip() + if part == "*": + regex_parts.append(r"\d+") # 匹配任意数字 + elif part.isdigit(): + # 验证数字范围(0-255) + num = int(part) + if 0 <= num <= 255: + regex_parts.append(re.escape(part)) + else: + return None # 无效的数字 + else: + return None # 无效的格式 + + # 如果部分少于4段,补充.* + while len(regex_parts) < 4: + regex_parts.append(r"\d+") + + # 组合成正则表达式 + regex = r"^" + r"\.".join(regex_parts) + r"$" + return regex + +ALLOWED_IPS = _parse_allowed_ips(os.getenv("WEBUI_ALLOWED_IPS", "")) + + +def _get_mode_config(mode: str) -> dict: + """ + 根据模式获取配置参数 + + Args: + mode: 防爬虫模式 (false/strict/loose/basic) + + Returns: + 配置字典,包含所有相关参数 + """ + mode = mode.lower() + + if mode == "false": + return { + "enabled": False, + "rate_limit_window": 60, + "rate_limit_max_requests": 1000, # 禁用时设置很高的值 + "max_tracked_ips": 0, + "check_user_agent": False, + "check_asset_scanner": False, + "check_rate_limit": False, + "block_on_detect": False, # 不阻止 + } + elif mode == "strict": + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 15, # 严格模式:更低的请求数 + "max_tracked_ips": 20000, + "check_user_agent": True, + "check_asset_scanner": True, + "check_rate_limit": True, + "block_on_detect": True, # 阻止恶意访问 + } + elif mode == "loose": + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 60, # 宽松模式:更高的请求数 + "max_tracked_ips": 5000, + "check_user_agent": True, + "check_asset_scanner": True, + "check_rate_limit": True, + "block_on_detect": True, # 阻止恶意访问 + } + else: # basic (默认模式) + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 1000, # 不限制请求数 + "max_tracked_ips": 0, # 不跟踪IP + "check_user_agent": True, # 检测但不阻止 + "check_asset_scanner": True, # 检测但不阻止 + "check_rate_limit": False, # 不限制请求频率 + "block_on_detect": False, # 只记录,不阻止 + } + + +class AntiCrawlerMiddleware(BaseHTTPMiddleware): + """防爬虫中间件""" + + def __init__(self, app, mode: str = "standard"): + """ + 初始化防爬虫中间件 + + Args: + app: FastAPI 应用实例 + mode: 防爬虫模式 (false/strict/loose/standard) + """ + super().__init__(app) + self.mode = mode.lower() + # 根据模式获取配置 + config = _get_mode_config(self.mode) + self.enabled = config["enabled"] + self.rate_limit_window = config["rate_limit_window"] + self.rate_limit_max_requests = config["rate_limit_max_requests"] + self.max_tracked_ips = config["max_tracked_ips"] + self.check_user_agent = config["check_user_agent"] + self.check_asset_scanner = config["check_asset_scanner"] + self.check_rate_limit = config["check_rate_limit"] + self.block_on_detect = config["block_on_detect"] # 是否阻止检测到的恶意访问 + + # 用于存储每个IP的请求时间戳 + self.request_times: dict[str, list[float]] = defaultdict(list) + # 上次清理时间 + self.last_cleanup = time.time() + # 将关键词列表转换为集合以提高查找性能 + self.crawler_keywords_set = set(CRAWLER_USER_AGENTS) + self.scanner_keywords_set = set(ASSET_SCANNER_USER_AGENTS) + self.suspicious_patterns_set = set(SUSPICIOUS_HEADER_PATTERNS) + + def _is_crawler_user_agent(self, user_agent: Optional[str]) -> bool: + """ + 检测是否为爬虫 User-Agent + + Args: + user_agent: User-Agent 字符串 + + Returns: + 如果是爬虫则返回 True + """ + if not user_agent: + # 没有 User-Agent 的请求记录日志但不直接阻止 + # 改为只记录,让频率限制来处理 + logger.debug("请求缺少User-Agent") + return False # 不再直接阻止无User-Agent的请求 + + user_agent_lower = user_agent.lower() + + # 使用集合查找提高性能(检查是否包含爬虫关键词) + for crawler_keyword in self.crawler_keywords_set: + if crawler_keyword in user_agent_lower: + return True + + return False + + def _is_asset_scanner_user_agent(self, user_agent: Optional[str]) -> bool: + """ + 检测是否为资产测绘工具 User-Agent + + Args: + user_agent: User-Agent 字符串 + + Returns: + 如果是资产测绘工具则返回 True + """ + if not user_agent: + return False + + user_agent_lower = user_agent.lower() + + # 检查是否包含资产测绘工具关键词 + for scanner_keyword in ASSET_SCANNER_USER_AGENTS: + if scanner_keyword in user_agent_lower: + return True + + return False + + def _is_asset_scanner_header(self, request: Request) -> bool: + """ + 检测是否为资产测绘工具的HTTP头 + + Args: + request: 请求对象 + + Returns: + 如果检测到资产测绘工具头则返回 True + """ + # 检查所有HTTP头 + for header_name, header_value in request.headers.items(): + header_name_lower = header_name.lower() + header_value_lower = header_value.lower() if header_value else "" + + # 检查已知的扫描工具头 + if header_name_lower in ASSET_SCANNER_HEADERS: + # 如果该头有特定的工具集合,检查值是否匹配 + expected_tools = ASSET_SCANNER_HEADERS[header_name_lower] + if expected_tools: + for tool in expected_tools: + if tool in header_value_lower: + return True + else: + # 如果没有特定工具集合,只要存在该头就视为可疑 + if header_value_lower: + return True + + # 使用集合查找提高性能(检查头值中是否包含可疑模式) + for pattern in self.suspicious_patterns_set: + if pattern in header_name_lower or pattern in header_value_lower: + return True + + return False + + def _detect_asset_scanner(self, request: Request) -> tuple[bool, Optional[str]]: + """ + 检测资产测绘工具 + + Args: + request: 请求对象 + + Returns: + (是否检测到, 检测到的工具名称) + """ + user_agent = request.headers.get("User-Agent") + + # 检查 User-Agent(使用集合查找提高性能) + if user_agent: + user_agent_lower = user_agent.lower() + for scanner_keyword in self.scanner_keywords_set: + if scanner_keyword in user_agent_lower: + return True, scanner_keyword + + # 检查HTTP头 + if self._is_asset_scanner_header(request): + # 尝试从User-Agent或头中提取工具名称 + detected_tool = None + if user_agent: + user_agent_lower = user_agent.lower() + for tool in self.scanner_keywords_set: + if tool in user_agent_lower: + detected_tool = tool + break + + # 检查HTTP头中的工具标识 + if not detected_tool: + for header_name, header_value in request.headers.items(): + header_value_lower = (header_value or "").lower() + for tool in self.scanner_keywords_set: + if tool in header_value_lower: + detected_tool = tool + break + if detected_tool: + break + + return True, detected_tool or "unknown_scanner" + + return False, None + + def _check_rate_limit(self, client_ip: str) -> bool: + """ + 检查请求频率限制 + + Args: + client_ip: 客户端IP地址 + + Returns: + 如果超过限制则返回 True(需要阻止) + """ + # 检查IP白名单 + if self._is_ip_allowed(client_ip): + return False + + # 限制跟踪的IP数量,防止内存泄漏 + if self.max_tracked_ips > 0 and len(self.request_times) > self.max_tracked_ips: + # 清理最旧的记录 + self._cleanup_old_requests(time.time()) + + current_time = time.time() + + # 定期清理过期的请求记录(每5分钟清理一次) + if current_time - self.last_cleanup > 300: + self._cleanup_old_requests(current_time) + self.last_cleanup = current_time + + # 获取该IP的请求时间列表 + request_times = self.request_times[client_ip] + + # 移除时间窗口外的请求记录 + request_times[:] = [ + req_time + for req_time in request_times + if current_time - req_time < self.rate_limit_window + ] + + # 检查是否超过限制 + if len(request_times) >= self.rate_limit_max_requests: + return True + + # 记录当前请求时间 + request_times.append(current_time) + return False + + def _cleanup_old_requests(self, current_time: float): + """清理过期的请求记录""" + for ip in list(self.request_times.keys()): + self.request_times[ip] = [ + req_time + for req_time in self.request_times[ip] + if current_time - req_time < self.rate_limit_window + ] + # 如果列表为空,删除该IP的记录 + if not self.request_times[ip]: + del self.request_times[ip] + + def _get_client_ip(self, request: Request) -> str: + """ + 获取客户端真实IP地址(带基本验证) + + Args: + request: 请求对象 + + Returns: + 客户端IP地址 + """ + # 优先从 X-Forwarded-For 获取(适用于反向代理) + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + # X-Forwarded-For 可能包含多个IP,取第一个 + ip = forwarded_for.split(",")[0].strip() + # 基本验证IP格式 + if self._validate_ip(ip): + return ip + + # 从 X-Real-IP 获取 + real_ip = request.headers.get("X-Real-IP") + if real_ip: + ip = real_ip.strip() + if self._validate_ip(ip): + return ip + + # 使用客户端IP + if request.client: + ip = request.client.host + if self._validate_ip(ip): + return ip + + return "unknown" + + def _validate_ip(self, ip: str) -> bool: + """ + 验证IP地址格式 + + Args: + ip: IP地址字符串 + + Returns: + 如果格式有效则返回 True + """ + try: + ipaddress.ip_address(ip) + return True + except (ValueError, AttributeError): + return False + + def _is_ip_allowed(self, ip: str) -> bool: + """ + 检查IP是否在白名单中(支持精确IP、CIDR格式和通配符) + + Args: + ip: 客户端IP地址 + + Returns: + 如果IP在白名单中则返回 True + """ + if not ALLOWED_IPS or ip == "unknown": + return False + + # 检查白名单中的每个条目 + for allowed_entry in ALLOWED_IPS: + # 通配符模式(字符串,正则表达式) + if isinstance(allowed_entry, str): + try: + if re.match(allowed_entry, ip): + return True + except re.error: + # 正则表达式错误,跳过 + continue + # CIDR格式(网络对象) + elif isinstance(allowed_entry, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj in allowed_entry: + return True + except (ValueError, AttributeError): + # IP格式无效,跳过 + continue + # 精确IP(地址对象) + elif isinstance(allowed_entry, (ipaddress.IPv4Address, ipaddress.IPv6Address)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj == allowed_entry: + return True + except (ValueError, AttributeError): + # IP格式无效,跳过 + continue + + return False + + async def dispatch(self, request: Request, call_next): + """ + 处理请求 + + Args: + request: 请求对象 + call_next: 下一个中间件或路由处理函数 + + Returns: + 响应对象 + """ + # 如果未启用,直接通过 + if not self.enabled: + return await call_next(request) + + # 允许访问 robots.txt(由专门的路由处理) + if request.url.path == "/robots.txt": + return await call_next(request) + + # 允许访问静态资源(CSS、JS、图片等) + static_extensions = {".css", ".js", ".json", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".woff", ".woff2", ".ttf", ".eot"} + if any(request.url.path.endswith(ext) for ext in static_extensions): + return await call_next(request) + + # 获取客户端IP(只获取一次,避免重复调用) + client_ip = self._get_client_ip(request) + + # 检查IP白名单(优先检查,白名单IP直接通过) + if self._is_ip_allowed(client_ip): + return await call_next(request) + + # 获取 User-Agent + user_agent = request.headers.get("User-Agent") + + # 检测资产测绘工具(优先检测,因为更危险) + if self.check_asset_scanner: + is_scanner, scanner_name = self._detect_asset_scanner(request) + if is_scanner: + logger.warning( + f"🚫 检测到资产测绘工具请求 - IP: {client_ip}, 工具: {scanner_name}, " + f"User-Agent: {user_agent}, Path: {request.url.path}" + ) + # 根据配置决定是否阻止 + if self.block_on_detect: + return PlainTextResponse( + "Access Denied: Asset scanning tools are not allowed", + status_code=403, + ) + + # 检测爬虫 User-Agent + if self.check_user_agent and self._is_crawler_user_agent(user_agent): + logger.warning( + f"🚫 检测到爬虫请求 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}" + ) + # 根据配置决定是否阻止 + if self.block_on_detect: + return PlainTextResponse( + "Access Denied: Crawlers are not allowed", + status_code=403, + ) + + # 检查请求频率限制 + if self.check_rate_limit and self._check_rate_limit(client_ip): + logger.warning( + f"🚫 请求频率过高 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}" + ) + return PlainTextResponse( + "Too Many Requests: Rate limit exceeded", + status_code=429, + ) + + # 正常请求,继续处理 + return await call_next(request) + + +def create_robots_txt_response() -> PlainTextResponse: + """ + 创建 robots.txt 响应 + + Returns: + robots.txt 响应对象 + """ + robots_content = """User-agent: * +Disallow: / + +# 禁止所有爬虫访问 +""" + return PlainTextResponse( + content=robots_content, + media_type="text/plain", + headers={"Cache-Control": "public, max-age=86400"}, # 缓存24小时 + ) + diff --git a/src/webui/webui_server.py b/src/webui/webui_server.py index ac95e80c..afe17fd8 100644 --- a/src/webui/webui_server.py +++ b/src/webui/webui_server.py @@ -21,12 +21,18 @@ class WebUIServer: self.app = FastAPI(title="MaiBot WebUI") self._server = None + # 配置防爬虫中间件(需要在CORS之前注册) + self._setup_anti_crawler() + # 显示 Access Token self._show_access_token() # 重要:先注册 API 路由,再设置静态文件 self._register_api_routes() self._setup_static_files() + + # 注册robots.txt路由 + self._setup_robots_txt() def _show_access_token(self): """显示 WebUI Access Token""" @@ -82,6 +88,46 @@ class WebUIServer: logger.info(f"✅ WebUI 静态文件服务已配置: {static_path}") + def _setup_anti_crawler(self): + """配置防爬虫中间件""" + try: + from src.webui.anti_crawler import AntiCrawlerMiddleware + + # 从环境变量读取防爬虫模式(false/strict/loose/standard) + anti_crawler_mode = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "standard").lower() + + # 注意:中间件按注册顺序反向执行,所以先注册的中间件后执行 + # 我们需要在CORS之前注册,这样防爬虫检查会在CORS之前执行 + self.app.add_middleware( + AntiCrawlerMiddleware, + mode=anti_crawler_mode + ) + + mode_descriptions = { + "false": "已禁用", + "strict": "严格模式", + "loose": "宽松模式", + "standard": "标准模式" + } + mode_desc = mode_descriptions.get(anti_crawler_mode, "标准模式") + logger.info(f"🛡️ 防爬虫中间件已配置: {mode_desc}") + except Exception as e: + logger.error(f"❌ 配置防爬虫中间件失败: {e}", exc_info=True) + + def _setup_robots_txt(self): + """设置robots.txt路由""" + try: + from src.webui.anti_crawler import create_robots_txt_response + + @self.app.get("/robots.txt", include_in_schema=False) + async def robots_txt(): + """返回robots.txt,禁止所有爬虫""" + return create_robots_txt_response() + + logger.debug("✅ robots.txt 路由已注册") + except Exception as e: + logger.error(f"❌ 注册robots.txt路由失败: {e}", exc_info=True) + def _register_api_routes(self): """注册所有 WebUI API 路由""" try: diff --git a/template/template.env b/template/template.env index b6dd0e5c..a08635fb 100644 --- a/template/template.env +++ b/template/template.env @@ -6,4 +6,10 @@ PORT=8000 WEBUI_ENABLED=true WEBUI_MODE=production # 模式: development(开发) 或 production(生产) WEBUI_HOST=0.0.0.0 # WebUI 服务器监听地址 -WEBUI_PORT=8001 # WebUI 服务器端口 \ No newline at end of file +WEBUI_PORT=8001 # WebUI 服务器端口 + +# 防爬虫配置 +WEBUI_ANTI_CRAWLER_MODE=basic # 防爬虫模式: false(禁用) / strict(严格) / loose(宽松) / basic(基础-只记录不阻止) +WEBUI_ALLOWED_IPS=127.0.0.1 # IP白名单(逗号分隔,支持精确IP、CIDR格式和通配符) + # 示例: 127.0.0.1,192.168.1.0/24,172.17.0.0/16 + # 注意: 不要使用 *.*.*.* 或 *,这会导致防爬虫功能完全失效 \ No newline at end of file From 223a6b73339b916dc21768bd65d4352a45d0788c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:25:34 +0800 Subject: [PATCH 2/7] Update main.py --- src/main.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/main.py b/src/main.py index b6141ad4..02702f2c 100644 --- a/src/main.py +++ b/src/main.py @@ -13,9 +13,9 @@ from src.config.config import global_config from src.chat.message_receive.bot import chat_bot from src.common.logger import get_logger from src.common.server import get_global_server, Server +from src.mood.mood_manager import mood_manager from src.chat.knowledge import lpmm_start_up from rich.traceback import install - # from src.api.main import start_api_server # 导入新的插件管理器 @@ -23,7 +23,6 @@ from src.plugin_system.core.plugin_manager import plugin_manager # 导入消息API和traceback模块 from src.common.message import get_global_api -from src.dream.dream_agent import start_dream_scheduler # 插件系统现在使用统一的插件加载器 @@ -51,11 +50,23 @@ class MainSystem: logger.info("WebUI 已禁用") return + webui_mode = os.getenv("WEBUI_MODE", "production").lower() + try: from src.webui.webui_server import get_webui_server self.webui_server = get_webui_server() - + + if webui_mode == "development": + logger.info("📝 WebUI 开发模式已启用") + logger.info("🌐 后端 API 将运行在 http://0.0.0.0:8001") + logger.info("💡 请手动启动前端开发服务器: cd MaiBot-Dashboard && bun dev") + logger.info("💡 前端将运行在 http://localhost:7999") + else: + logger.info("✅ WebUI 生产模式已启用") + logger.info(f"🌐 WebUI 将运行在 http://0.0.0.0:8001") + logger.info("💡 请确保已构建前端: cd MaiBot-Dashboard && bun run build") + except Exception as e: logger.error(f"❌ 初始化 WebUI 服务器失败: {e}") @@ -95,7 +106,7 @@ class MainSystem: await async_task_manager.add_task(TelemetryHeartBeatTask()) # 添加记忆遗忘任务 - from src.hippo_memorizer.memory_forget_task import MemoryForgetTask + from src.chat.utils.memory_forget_task import MemoryForgetTask await async_task_manager.add_task(MemoryForgetTask()) @@ -113,6 +124,11 @@ class MainSystem: get_emoji_manager().initialize() logger.info("表情包管理器初始化成功") + # 启动情绪管理器 + if global_config.mood.enable_mood: + await mood_manager.start() + logger.info("情绪管理器初始化成功") + # 初始化聊天管理器 await get_chat_manager()._initialize() asyncio.create_task(get_chat_manager()._auto_save_task()) @@ -143,7 +159,6 @@ class MainSystem: try: tasks = [ get_emoji_manager().start_periodic_check_register(), - start_dream_scheduler(), self.app.run(), self.server.run(), ] From f6adc8bcf70198db54bd5cc7dad09e0d31ea67d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:28:23 +0800 Subject: [PATCH 3/7] Update webui_server.py --- src/webui/webui_server.py | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/webui/webui_server.py b/src/webui/webui_server.py index 87b47192..1c4dfc60 100644 --- a/src/webui/webui_server.py +++ b/src/webui/webui_server.py @@ -22,6 +22,9 @@ class WebUIServer: self.app = FastAPI(title="MaiBot WebUI") self._server = None + # 配置防爬虫中间件(需要在CORS之前注册) + self._setup_anti_crawler() + # 配置 CORS(支持开发环境跨域请求) self._setup_cors() @@ -31,6 +34,9 @@ class WebUIServer: # 重要:先注册 API 路由,再设置静态文件 self._register_api_routes() self._setup_static_files() + + # 注册robots.txt路由 + self._setup_robots_txt() def _setup_cors(self): """配置 CORS 中间件""" @@ -103,6 +109,46 @@ class WebUIServer: logger.info(f"✅ WebUI 静态文件服务已配置: {static_path}") + def _setup_anti_crawler(self): + """配置防爬虫中间件""" + try: + from src.webui.anti_crawler import AntiCrawlerMiddleware + + # 从环境变量读取防爬虫模式(false/strict/loose/basic) + anti_crawler_mode = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "basic").lower() + + # 注意:中间件按注册顺序反向执行,所以先注册的中间件后执行 + # 我们需要在CORS之前注册,这样防爬虫检查会在CORS之前执行 + self.app.add_middleware( + AntiCrawlerMiddleware, + mode=anti_crawler_mode + ) + + mode_descriptions = { + "false": "已禁用", + "strict": "严格模式", + "loose": "宽松模式", + "basic": "基础模式" + } + mode_desc = mode_descriptions.get(anti_crawler_mode, "基础模式") + logger.info(f"🛡️ 防爬虫中间件已配置: {mode_desc}") + except Exception as e: + logger.error(f"❌ 配置防爬虫中间件失败: {e}", exc_info=True) + + def _setup_robots_txt(self): + """设置robots.txt路由""" + try: + from src.webui.anti_crawler import create_robots_txt_response + + @self.app.get("/robots.txt", include_in_schema=False) + async def robots_txt(): + """返回robots.txt,禁止所有爬虫""" + return create_robots_txt_response() + + logger.debug("✅ robots.txt 路由已注册") + except Exception as e: + logger.error(f"❌ 注册robots.txt路由失败: {e}", exc_info=True) + def _register_api_routes(self): """注册所有 WebUI API 路由""" try: From 16271718a79c71836c8964ac3b3634155f47af38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:31:11 +0800 Subject: [PATCH 4/7] Revert main.py --- src/main.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/main.py b/src/main.py index 02702f2c..b6141ad4 100644 --- a/src/main.py +++ b/src/main.py @@ -13,9 +13,9 @@ from src.config.config import global_config from src.chat.message_receive.bot import chat_bot from src.common.logger import get_logger from src.common.server import get_global_server, Server -from src.mood.mood_manager import mood_manager from src.chat.knowledge import lpmm_start_up from rich.traceback import install + # from src.api.main import start_api_server # 导入新的插件管理器 @@ -23,6 +23,7 @@ from src.plugin_system.core.plugin_manager import plugin_manager # 导入消息API和traceback模块 from src.common.message import get_global_api +from src.dream.dream_agent import start_dream_scheduler # 插件系统现在使用统一的插件加载器 @@ -50,23 +51,11 @@ class MainSystem: logger.info("WebUI 已禁用") return - webui_mode = os.getenv("WEBUI_MODE", "production").lower() - try: from src.webui.webui_server import get_webui_server self.webui_server = get_webui_server() - - if webui_mode == "development": - logger.info("📝 WebUI 开发模式已启用") - logger.info("🌐 后端 API 将运行在 http://0.0.0.0:8001") - logger.info("💡 请手动启动前端开发服务器: cd MaiBot-Dashboard && bun dev") - logger.info("💡 前端将运行在 http://localhost:7999") - else: - logger.info("✅ WebUI 生产模式已启用") - logger.info(f"🌐 WebUI 将运行在 http://0.0.0.0:8001") - logger.info("💡 请确保已构建前端: cd MaiBot-Dashboard && bun run build") - + except Exception as e: logger.error(f"❌ 初始化 WebUI 服务器失败: {e}") @@ -106,7 +95,7 @@ class MainSystem: await async_task_manager.add_task(TelemetryHeartBeatTask()) # 添加记忆遗忘任务 - from src.chat.utils.memory_forget_task import MemoryForgetTask + from src.hippo_memorizer.memory_forget_task import MemoryForgetTask await async_task_manager.add_task(MemoryForgetTask()) @@ -124,11 +113,6 @@ class MainSystem: get_emoji_manager().initialize() logger.info("表情包管理器初始化成功") - # 启动情绪管理器 - if global_config.mood.enable_mood: - await mood_manager.start() - logger.info("情绪管理器初始化成功") - # 初始化聊天管理器 await get_chat_manager()._initialize() asyncio.create_task(get_chat_manager()._auto_save_task()) @@ -159,6 +143,7 @@ class MainSystem: try: tasks = [ get_emoji_manager().start_periodic_check_register(), + start_dream_scheduler(), self.app.run(), self.server.run(), ] From 97c872f4f2cff353bd7d2886597b4d0513160e73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:47:07 +0800 Subject: [PATCH 5/7] =?UTF-8?q?feat:=E5=A2=9E=E5=BC=BA=E5=8F=8D=E7=88=AC?= =?UTF-8?q?=E8=99=AB=E4=B8=AD=E9=97=B4=E4=BB=B6=EF=BC=8C=E5=AF=B9=E5=8F=97?= =?UTF-8?q?=E4=BF=A1=E4=BB=BB=E4=BB=A3=E7=90=86=E6=8F=90=E4=BE=9B=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构反爬虫逻辑,使用 deque 存储请求时间戳,以提升性能和内存管理效率。新增通过 WEBUI_TRUSTED_PROXIES 和 WEBUI_TRUST_XFF 配置受信任代理的支持,从而实现对 X-Forwarded-For 请求头的选择性信任。将可疑请求头的检测限制在特定请求头范围内,减少误判。更新 template.env,新增与代理相关的环境变量。 --- src/webui/anti_crawler.py | 250 +++++++++++++++++++++++--------------- template/template.env | 4 +- 2 files changed, 157 insertions(+), 97 deletions(-) diff --git a/src/webui/anti_crawler.py b/src/webui/anti_crawler.py index c8c3c318..cc1cb202 100644 --- a/src/webui/anti_crawler.py +++ b/src/webui/anti_crawler.py @@ -7,13 +7,11 @@ import os import time import ipaddress import re -from collections import defaultdict -from typing import Optional, Union -from functools import lru_cache +from collections import deque +from typing import Optional from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request -from starlette.responses import Response, PlainTextResponse -from fastapi import HTTPException +from starlette.responses import PlainTextResponse from src.common.logger import get_logger @@ -110,18 +108,15 @@ ASSET_SCANNER_HEADERS = { # - "x-real-ip" (反向代理标准头,已在_get_client_ip中使用) } -# 可疑的HTTP头值模式(用于检测扫描工具) -SUSPICIOUS_HEADER_PATTERNS = { - "shodan", - "censys", - "zoomeye", - "fofa", - "quake", - "scanner", - "probe", - "scan", - "recon", - "reconnaissance", +# 仅检查特定HTTP头中的可疑模式(收紧匹配范围) +# 只检查这些特定头,不检查所有头 +SCANNER_SPECIFIC_HEADERS = { + "x-scan", + "x-scanner", + "x-probe", + "x-originating-ip", + "x-remote-ip", + "x-remote-addr", } # 防爬虫模式配置 @@ -237,6 +232,12 @@ def _convert_wildcard_to_regex(wildcard_pattern: str) -> Optional[str]: ALLOWED_IPS = _parse_allowed_ips(os.getenv("WEBUI_ALLOWED_IPS", "")) +# 信任的代理IP配置(从环境变量读取,逗号分隔) +# 只有在信任的代理IP下才使用X-Forwarded-For头 +# 默认关闭(空),不信任任何代理 +TRUSTED_PROXIES = _parse_allowed_ips(os.getenv("WEBUI_TRUSTED_PROXIES", "")) +TRUST_XFF = os.getenv("WEBUI_TRUST_XFF", "false").lower() == "true" + def _get_mode_config(mode: str) -> dict: """ @@ -320,14 +321,13 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): self.check_rate_limit = config["check_rate_limit"] self.block_on_detect = config["block_on_detect"] # 是否阻止检测到的恶意访问 - # 用于存储每个IP的请求时间戳 - self.request_times: dict[str, list[float]] = defaultdict(list) + # 用于存储每个IP的请求时间戳(使用deque提高性能) + self.request_times: dict[str, deque] = {} # 上次清理时间 self.last_cleanup = time.time() # 将关键词列表转换为集合以提高查找性能 self.crawler_keywords_set = set(CRAWLER_USER_AGENTS) self.scanner_keywords_set = set(ASSET_SCANNER_USER_AGENTS) - self.suspicious_patterns_set = set(SUSPICIOUS_HEADER_PATTERNS) def _is_crawler_user_agent(self, user_agent: Optional[str]) -> bool: """ @@ -354,31 +354,10 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): return False - def _is_asset_scanner_user_agent(self, user_agent: Optional[str]) -> bool: - """ - 检测是否为资产测绘工具 User-Agent - - Args: - user_agent: User-Agent 字符串 - - Returns: - 如果是资产测绘工具则返回 True - """ - if not user_agent: - return False - - user_agent_lower = user_agent.lower() - - # 检查是否包含资产测绘工具关键词 - for scanner_keyword in ASSET_SCANNER_USER_AGENTS: - if scanner_keyword in user_agent_lower: - return True - - return False def _is_asset_scanner_header(self, request: Request) -> bool: """ - 检测是否为资产测绘工具的HTTP头 + 检测是否为资产测绘工具的HTTP头(只检查特定头,收紧匹配) Args: request: 请求对象 @@ -386,7 +365,7 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): Returns: 如果检测到资产测绘工具头则返回 True """ - # 检查所有HTTP头 + # 只检查特定的扫描工具头,不检查所有头 for header_name, header_value in request.headers.items(): header_name_lower = header_name.lower() header_value_lower = header_value.lower() if header_value else "" @@ -404,10 +383,12 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): if header_value_lower: return True - # 使用集合查找提高性能(检查头值中是否包含可疑模式) - for pattern in self.suspicious_patterns_set: - if pattern in header_name_lower or pattern in header_value_lower: - return True + # 只检查特定头中的可疑模式(收紧匹配) + if header_name_lower in SCANNER_SPECIFIC_HEADERS: + # 检查头值中是否包含已知扫描工具名称 + for tool in self.scanner_keywords_set: + if tool in header_value_lower: + return True return False @@ -441,16 +422,18 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): detected_tool = tool break - # 检查HTTP头中的工具标识 + # 检查HTTP头中的工具标识(只检查特定头) if not detected_tool: for header_name, header_value in request.headers.items(): - header_value_lower = (header_value or "").lower() - for tool in self.scanner_keywords_set: - if tool in header_value_lower: - detected_tool = tool + header_name_lower = header_name.lower() + if header_name_lower in SCANNER_SPECIFIC_HEADERS: + header_value_lower = (header_value or "").lower() + for tool in self.scanner_keywords_set: + if tool in header_value_lower: + detected_tool = tool + break + if detected_tool: break - if detected_tool: - break return True, detected_tool or "unknown_scanner" @@ -470,11 +453,6 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): if self._is_ip_allowed(client_ip): return False - # 限制跟踪的IP数量,防止内存泄漏 - if self.max_tracked_ips > 0 and len(self.request_times) > self.max_tracked_ips: - # 清理最旧的记录 - self._cleanup_old_requests(time.time()) - current_time = time.time() # 定期清理过期的请求记录(每5分钟清理一次) @@ -482,15 +460,20 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): self._cleanup_old_requests(current_time) self.last_cleanup = current_time - # 获取该IP的请求时间列表 + # 限制跟踪的IP数量,防止内存泄漏 + if self.max_tracked_ips > 0 and len(self.request_times) > self.max_tracked_ips: + # 清理最旧的记录(删除最久未访问的IP) + self._cleanup_oldest_ips() + + # 获取或创建该IP的请求时间deque + if client_ip not in self.request_times: + self.request_times[client_ip] = deque(maxlen=self.rate_limit_max_requests * 2) + request_times = self.request_times[client_ip] - # 移除时间窗口外的请求记录 - request_times[:] = [ - req_time - for req_time in request_times - if current_time - req_time < self.rate_limit_window - ] + # 移除时间窗口外的请求记录(从左侧弹出过期记录) + while request_times and current_time - request_times[0] >= self.rate_limit_window: + request_times.popleft() # 检查是否超过限制 if len(request_times) >= self.rate_limit_max_requests: @@ -501,20 +484,84 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): return False def _cleanup_old_requests(self, current_time: float): - """清理过期的请求记录""" - for ip in list(self.request_times.keys()): - self.request_times[ip] = [ - req_time - for req_time in self.request_times[ip] - if current_time - req_time < self.rate_limit_window - ] - # 如果列表为空,删除该IP的记录 - if not self.request_times[ip]: + """清理过期的请求记录(只清理当前需要检查的IP,不全量遍历)""" + # 这个方法现在主要用于定期清理,实际清理在_check_rate_limit中按需进行 + # 清理最久未访问的IP记录 + if len(self.request_times) > self.max_tracked_ips * 0.8: + self._cleanup_oldest_ips() + + def _cleanup_oldest_ips(self): + """清理最久未访问的IP记录(避免全量遍历)""" + if not self.request_times: + return + + # 找到最久未访问的IP(deque为空或最旧时间戳) + oldest_ip = None + oldest_time = float('inf') + + # 只检查部分IP,不全量遍历 + check_count = min(100, len(self.request_times)) + checked = 0 + for ip, times in self.request_times.items(): + if checked >= check_count: + break + checked += 1 + if not times: + # 空deque,优先删除 del self.request_times[ip] + return + if times[0] < oldest_time: + oldest_time = times[0] + oldest_ip = ip + + # 删除最久未访问的IP + if oldest_ip: + del self.request_times[oldest_ip] + + def _is_trusted_proxy(self, ip: str) -> bool: + """ + 检查IP是否在信任的代理列表中 + + Args: + ip: IP地址字符串 + + Returns: + 如果是信任的代理则返回 True + """ + if not TRUSTED_PROXIES or ip == "unknown": + return False + + # 检查代理列表中的每个条目 + for trusted_entry in TRUSTED_PROXIES: + # 通配符模式(字符串,正则表达式) + if isinstance(trusted_entry, str): + try: + if re.match(trusted_entry, ip): + return True + except re.error: + continue + # CIDR格式(网络对象) + elif isinstance(trusted_entry, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj in trusted_entry: + return True + except (ValueError, AttributeError): + continue + # 精确IP(地址对象) + elif isinstance(trusted_entry, (ipaddress.IPv4Address, ipaddress.IPv6Address)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj == trusted_entry: + return True + except (ValueError, AttributeError): + continue + + return False def _get_client_ip(self, request: Request) -> str: """ - 获取客户端真实IP地址(带基本验证) + 获取客户端真实IP地址(带基本验证和代理信任检查) Args: request: 请求对象 @@ -522,27 +569,38 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): Returns: 客户端IP地址 """ - # 优先从 X-Forwarded-For 获取(适用于反向代理) - forwarded_for = request.headers.get("X-Forwarded-For") - if forwarded_for: - # X-Forwarded-For 可能包含多个IP,取第一个 - ip = forwarded_for.split(",")[0].strip() - # 基本验证IP格式 - if self._validate_ip(ip): - return ip - - # 从 X-Real-IP 获取 - real_ip = request.headers.get("X-Real-IP") - if real_ip: - ip = real_ip.strip() - if self._validate_ip(ip): - return ip - - # 使用客户端IP + # 获取直接连接的客户端IP(用于验证代理) + direct_client_ip = None if request.client: - ip = request.client.host - if self._validate_ip(ip): - return ip + direct_client_ip = request.client.host + + # 检查是否信任X-Forwarded-For头 + use_xff = TRUST_XFF + if not use_xff and TRUSTED_PROXIES and direct_client_ip: + # 如果配置了信任的代理列表,检查直接连接的IP是否在信任列表中 + use_xff = self._is_trusted_proxy(direct_client_ip) + + # 如果信任代理,优先从 X-Forwarded-For 获取 + if use_xff: + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + # X-Forwarded-For 可能包含多个IP,取第一个 + ip = forwarded_for.split(",")[0].strip() + # 基本验证IP格式 + if self._validate_ip(ip): + return ip + + # 从 X-Real-IP 获取(如果信任代理) + if use_xff: + real_ip = request.headers.get("X-Real-IP") + if real_ip: + ip = real_ip.strip() + if self._validate_ip(ip): + return ip + + # 使用直接连接的客户端IP + if direct_client_ip and self._validate_ip(direct_client_ip): + return direct_client_ip return "unknown" diff --git a/template/template.env b/template/template.env index a08635fb..b08fecf0 100644 --- a/template/template.env +++ b/template/template.env @@ -12,4 +12,6 @@ WEBUI_PORT=8001 # WebUI 服务器端口 WEBUI_ANTI_CRAWLER_MODE=basic # 防爬虫模式: false(禁用) / strict(严格) / loose(宽松) / basic(基础-只记录不阻止) WEBUI_ALLOWED_IPS=127.0.0.1 # IP白名单(逗号分隔,支持精确IP、CIDR格式和通配符) # 示例: 127.0.0.1,192.168.1.0/24,172.17.0.0/16 - # 注意: 不要使用 *.*.*.* 或 *,这会导致防爬虫功能完全失效 \ No newline at end of file +WEBUI_TRUSTED_PROXIES= # 信任的代理IP列表(逗号分隔),只有来自这些IP的X-Forwarded-For才被信任 + # 示例: 127.0.0.1,192.168.1.1,172.17.0.1 +WEBUI_TRUST_XFF=false # 是否信任X-Forwarded-For头(默认false,需要配合TRUSTED_PROXIES使用) \ No newline at end of file From bccef9f1044b0f3777d57730b96ca66da9b9d90f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 19:56:11 +0800 Subject: [PATCH 6/7] =?UTF-8?q?feat:=E5=A2=9E=E5=BC=BA=E5=8F=8D=E7=88=AC?= =?UTF-8?q?=E8=99=AB=E7=9A=84=20IP=20=E6=B8=85=E7=90=86=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=E5=92=8C=E9=9D=99=E6=80=81=E8=B5=84=E6=BA=90=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 优化 IP 清理逻辑,完整扫描并清除最旧或空的 IP 记录,从而提升内存管理效果。更新静态资源的放行规则,改为基于路径前缀进行限制,并从允许的扩展名中移除 .json,以防止通过静态资源规则绕过 API。进一步明确 X-Forwarded-For 的信任逻辑,并更新相关环境变量的注释,提供更清晰的配置指引。 --- src/webui/anti_crawler.py | 60 ++++++++++++++++++++++++--------------- template/template.env | 3 +- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/src/webui/anti_crawler.py b/src/webui/anti_crawler.py index cc1cb202..c82afa7c 100644 --- a/src/webui/anti_crawler.py +++ b/src/webui/anti_crawler.py @@ -465,9 +465,9 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): # 清理最旧的记录(删除最久未访问的IP) self._cleanup_oldest_ips() - # 获取或创建该IP的请求时间deque + # 获取或创建该IP的请求时间deque(不使用maxlen,避免限流变松) if client_ip not in self.request_times: - self.request_times[client_ip] = deque(maxlen=self.rate_limit_max_requests * 2) + self.request_times[client_ip] = deque() request_times = self.request_times[client_ip] @@ -491,31 +491,33 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): self._cleanup_oldest_ips() def _cleanup_oldest_ips(self): - """清理最久未访问的IP记录(避免全量遍历)""" + """清理最久未访问的IP记录(全量遍历找真正的oldest)""" if not self.request_times: return - # 找到最久未访问的IP(deque为空或最旧时间戳) + # 先收集空deque的IP(优先删除) + empty_ips = [] + # 找到最久未访问的IP(最旧时间戳) oldest_ip = None oldest_time = float('inf') - # 只检查部分IP,不全量遍历 - check_count = min(100, len(self.request_times)) - checked = 0 + # 全量遍历找真正的oldest(超限时性能可接受) for ip, times in self.request_times.items(): - if checked >= check_count: - break - checked += 1 if not times: - # 空deque,优先删除 - del self.request_times[ip] - return - if times[0] < oldest_time: - oldest_time = times[0] - oldest_ip = ip + # 空deque,记录待删除 + empty_ips.append(ip) + else: + # 找到最旧的时间戳 + if times[0] < oldest_time: + oldest_time = times[0] + oldest_ip = ip - # 删除最久未访问的IP - if oldest_ip: + # 先删除空deque的IP + for ip in empty_ips: + del self.request_times[ip] + + # 如果没有空deque可删除,且仍需要清理,删除最旧的一个IP + if not empty_ips and oldest_ip: del self.request_times[oldest_ip] def _is_trusted_proxy(self, ip: str) -> bool: @@ -575,9 +577,10 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): direct_client_ip = request.client.host # 检查是否信任X-Forwarded-For头 - use_xff = TRUST_XFF - if not use_xff and TRUSTED_PROXIES and direct_client_ip: - # 如果配置了信任的代理列表,检查直接连接的IP是否在信任列表中 + # TRUST_XFF 只表示"启用代理解析能力",但仍要求直连 IP 在 TRUSTED_PROXIES 中 + use_xff = False + if TRUST_XFF and TRUSTED_PROXIES and direct_client_ip: + # 只有在启用 TRUST_XFF 且直连 IP 在信任列表中时,才信任 XFF use_xff = self._is_trusted_proxy(direct_client_ip) # 如果信任代理,优先从 X-Forwarded-For 获取 @@ -684,8 +687,19 @@ class AntiCrawlerMiddleware(BaseHTTPMiddleware): return await call_next(request) # 允许访问静态资源(CSS、JS、图片等) - static_extensions = {".css", ".js", ".json", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".woff", ".woff2", ".ttf", ".eot"} - if any(request.url.path.endswith(ext) for ext in static_extensions): + # 注意:.json 已移除,避免 API 路径绕过防护 + # 静态资源只在特定前缀下放行(/static/、/assets/、/dist/) + static_extensions = {".css", ".js", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".woff", ".woff2", ".ttf", ".eot"} + static_prefixes = {"/static/", "/assets/", "/dist/"} + + # 检查是否是静态资源路径(特定前缀下的静态文件) + path = request.url.path + is_static_path = any(path.startswith(prefix) for prefix in static_prefixes) and any(path.endswith(ext) for ext in static_extensions) + + # 也允许根路径下的静态文件(如 /favicon.ico) + is_root_static = path.count("/") == 1 and any(path.endswith(ext) for ext in static_extensions) + + if is_static_path or is_root_static: return await call_next(request) # 获取客户端IP(只获取一次,避免重复调用) diff --git a/template/template.env b/template/template.env index b08fecf0..1d46dcf5 100644 --- a/template/template.env +++ b/template/template.env @@ -14,4 +14,5 @@ WEBUI_ALLOWED_IPS=127.0.0.1 # IP白名单(逗号分隔,支持精确IP、 # 示例: 127.0.0.1,192.168.1.0/24,172.17.0.0/16 WEBUI_TRUSTED_PROXIES= # 信任的代理IP列表(逗号分隔),只有来自这些IP的X-Forwarded-For才被信任 # 示例: 127.0.0.1,192.168.1.1,172.17.0.1 -WEBUI_TRUST_XFF=false # 是否信任X-Forwarded-For头(默认false,需要配合TRUSTED_PROXIES使用) \ No newline at end of file +WEBUI_TRUST_XFF=false # 是否启用X-Forwarded-For代理解析(默认false) + # 启用后,仍要求直连IP在TRUSTED_PROXIES中才会信任XFF头 \ No newline at end of file From c3c5bc337d42c992a9f37c3e488d17c05a44376b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=9B=A6?= <2584059816@qq.com> Date: Sun, 14 Dec 2025 21:00:18 +0800 Subject: [PATCH 7/7] Update webui_server.py --- src/webui/webui_server.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/webui/webui_server.py b/src/webui/webui_server.py index 45f6383f..8162642f 100644 --- a/src/webui/webui_server.py +++ b/src/webui/webui_server.py @@ -98,21 +98,6 @@ class WebUIServer: logger.warning("💡 请确认前端已正确构建") return - # robots.txt - 禁止搜索引擎索引 - @self.app.get("/robots.txt", include_in_schema=False) - async def robots_txt(): - """返回 robots.txt 禁止所有爬虫""" - from fastapi.responses import PlainTextResponse - content = """User-agent: * -Disallow: / - -# MaiBot Dashboard - 私有管理面板,禁止索引 -""" - return PlainTextResponse( - content=content, - headers={"X-Robots-Tag": "noindex, nofollow, noarchive"} - ) - # 处理 SPA 路由 - 注意:这个路由优先级最低 @self.app.get("/{full_path:path}", include_in_schema=False) async def serve_spa(full_path: str):