diff --git a/src/webui/anti_crawler.py b/src/webui/anti_crawler.py new file mode 100644 index 00000000..c8c3c318 --- /dev/null +++ b/src/webui/anti_crawler.py @@ -0,0 +1,701 @@ +""" +WebUI 防爬虫模块 +提供爬虫检测和阻止功能,保护 WebUI 不被搜索引擎和恶意爬虫访问 +""" + +import os +import time +import ipaddress +import re +from collections import defaultdict +from typing import Optional, Union +from functools import lru_cache +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response, PlainTextResponse +from fastapi import HTTPException + +from src.common.logger import get_logger + +logger = get_logger("webui.anti_crawler") + +# 常见爬虫 User-Agent 列表(使用更精确的关键词,避免误报) +CRAWLER_USER_AGENTS = { + # 搜索引擎爬虫(精确匹配) + "googlebot", + "bingbot", + "baiduspider", + "yandexbot", + "slurp", # Yahoo + "duckduckbot", + "sogou", + "exabot", + "facebot", + "ia_archiver", # Internet Archive + # 通用爬虫(移除过于宽泛的关键词) + "crawler", + "spider", + "scraper", + "wget", # 保留wget,因为通常用于自动化脚本 + "scrapy", # 保留scrapy,因为这是爬虫框架 + # 安全扫描工具(这些是明确的扫描工具) + "masscan", + "nmap", + "nikto", + "sqlmap", + # 注意:移除了以下过于宽泛的关键词以避免误报: + # - "bot" (会误匹配GitHub-Robot等) + # - "curl" (正常工具) + # - "python-requests" (正常库) + # - "httpx" (正常库) + # - "aiohttp" (正常库) +} + +# 资产测绘工具 User-Agent 标识 +ASSET_SCANNER_USER_AGENTS = { + # 知名资产测绘平台 + "shodan", + "censys", + "zoomeye", + "fofa", + "quake", + "hunter", + "binaryedge", + "onyphe", + "securitytrails", + "virustotal", + "passivetotal", + # 安全扫描工具 + "acunetix", + "appscan", + "burpsuite", + "nessus", + "openvas", + "qualys", + "rapid7", + "tenable", + "veracode", + "zap", + "awvs", # Acunetix Web Vulnerability Scanner + "netsparker", + "skipfish", + "w3af", + "arachni", + # 其他扫描工具 + "masscan", + "zmap", + "nmap", + "whatweb", + "wpscan", + "joomscan", + "dnsenum", + "subfinder", + "amass", + "sublist3r", + "theharvester", +} + +# 资产测绘工具常用的HTTP头标识 +ASSET_SCANNER_HEADERS = { + # 常见的扫描工具自定义头 + "x-scan": {"shodan", "censys", "zoomeye", "fofa"}, + "x-scanner": {"nmap", "masscan", "zmap"}, + "x-probe": {"masscan", "zmap"}, + # 其他可疑头(移除反向代理标准头) + "x-originating-ip": set(), + "x-remote-ip": set(), + "x-remote-addr": set(), + # 注意:移除了以下反向代理标准头以避免误报: + # - "x-forwarded-proto" (反向代理标准头) + # - "x-real-ip" (反向代理标准头,已在_get_client_ip中使用) +} + +# 可疑的HTTP头值模式(用于检测扫描工具) +SUSPICIOUS_HEADER_PATTERNS = { + "shodan", + "censys", + "zoomeye", + "fofa", + "quake", + "scanner", + "probe", + "scan", + "recon", + "reconnaissance", +} + +# 防爬虫模式配置 +# false: 禁用 +# strict: 严格模式(更严格的检测,更低的频率限制) +# loose: 宽松模式(较宽松的检测,较高的频率限制) +# basic: 基础模式(只记录恶意访问,不阻止,不限制请求数,不跟踪IP) +ANTI_CRAWLER_MODE = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "basic").lower() + +# IP白名单配置(从环境变量读取,逗号分隔) +# 支持格式: +# - 精确IP:127.0.0.1, 192.168.1.100 +# - CIDR格式:192.168.1.0/24, 172.17.0.0/16 (适用于Docker网络) +# - 通配符:192.168.*.*, 10.*.*.*, *.*.*.* (匹配所有) +# - IPv6:::1, 2001:db8::/32 +def _parse_allowed_ips(ip_string: str) -> list: + """ + 解析IP白名单字符串,支持精确IP、CIDR格式和通配符 + + Args: + ip_string: 逗号分隔的IP字符串 + + Returns: + IP白名单列表,每个元素可能是: + - ipaddress.IPv4Network/IPv6Network对象(CIDR格式) + - ipaddress.IPv4Address/IPv6Address对象(精确IP) + - str(通配符模式,已转换为正则表达式) + """ + allowed = [] + if not ip_string: + return allowed + + for ip_entry in ip_string.split(","): + ip_entry = ip_entry.strip() # 去除空格 + if not ip_entry: + continue + + # 检查通配符格式(包含*) + if "*" in ip_entry: + # 处理通配符 + pattern = _convert_wildcard_to_regex(ip_entry) + if pattern: + allowed.append(pattern) + else: + logger.warning(f"无效的通配符IP格式,已忽略: {ip_entry}") + continue + + try: + # 尝试解析为CIDR格式(包含/) + if "/" in ip_entry: + allowed.append(ipaddress.ip_network(ip_entry, strict=False)) + else: + # 精确IP地址 + allowed.append(ipaddress.ip_address(ip_entry)) + except (ValueError, AttributeError) as e: + logger.warning(f"无效的IP白名单条目,已忽略: {ip_entry} ({e})") + + return allowed + + +def _convert_wildcard_to_regex(wildcard_pattern: str) -> Optional[str]: + """ + 将通配符IP模式转换为正则表达式 + + 支持的格式: + - 192.168.*.* 或 192.168.* + - 10.*.*.* 或 10.* + - *.*.*.* 或 * + + Args: + wildcard_pattern: 通配符模式字符串 + + Returns: + 正则表达式字符串,如果格式无效则返回None + """ + # 去除空格 + pattern = wildcard_pattern.strip() + + # 处理单个*(匹配所有) + if pattern == "*": + return r".*" + + # 处理IPv4通配符格式 + # 支持:192.168.*.*, 192.168.*, 10.*.*.*, 10.* 等 + parts = pattern.split(".") + + if len(parts) > 4: + return None # IPv4最多4段 + + # 构建正则表达式 + regex_parts = [] + for part in parts: + part = part.strip() + if part == "*": + regex_parts.append(r"\d+") # 匹配任意数字 + elif part.isdigit(): + # 验证数字范围(0-255) + num = int(part) + if 0 <= num <= 255: + regex_parts.append(re.escape(part)) + else: + return None # 无效的数字 + else: + return None # 无效的格式 + + # 如果部分少于4段,补充.* + while len(regex_parts) < 4: + regex_parts.append(r"\d+") + + # 组合成正则表达式 + regex = r"^" + r"\.".join(regex_parts) + r"$" + return regex + +ALLOWED_IPS = _parse_allowed_ips(os.getenv("WEBUI_ALLOWED_IPS", "")) + + +def _get_mode_config(mode: str) -> dict: + """ + 根据模式获取配置参数 + + Args: + mode: 防爬虫模式 (false/strict/loose/basic) + + Returns: + 配置字典,包含所有相关参数 + """ + mode = mode.lower() + + if mode == "false": + return { + "enabled": False, + "rate_limit_window": 60, + "rate_limit_max_requests": 1000, # 禁用时设置很高的值 + "max_tracked_ips": 0, + "check_user_agent": False, + "check_asset_scanner": False, + "check_rate_limit": False, + "block_on_detect": False, # 不阻止 + } + elif mode == "strict": + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 15, # 严格模式:更低的请求数 + "max_tracked_ips": 20000, + "check_user_agent": True, + "check_asset_scanner": True, + "check_rate_limit": True, + "block_on_detect": True, # 阻止恶意访问 + } + elif mode == "loose": + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 60, # 宽松模式:更高的请求数 + "max_tracked_ips": 5000, + "check_user_agent": True, + "check_asset_scanner": True, + "check_rate_limit": True, + "block_on_detect": True, # 阻止恶意访问 + } + else: # basic (默认模式) + return { + "enabled": True, + "rate_limit_window": 60, + "rate_limit_max_requests": 1000, # 不限制请求数 + "max_tracked_ips": 0, # 不跟踪IP + "check_user_agent": True, # 检测但不阻止 + "check_asset_scanner": True, # 检测但不阻止 + "check_rate_limit": False, # 不限制请求频率 + "block_on_detect": False, # 只记录,不阻止 + } + + +class AntiCrawlerMiddleware(BaseHTTPMiddleware): + """防爬虫中间件""" + + def __init__(self, app, mode: str = "standard"): + """ + 初始化防爬虫中间件 + + Args: + app: FastAPI 应用实例 + mode: 防爬虫模式 (false/strict/loose/standard) + """ + super().__init__(app) + self.mode = mode.lower() + # 根据模式获取配置 + config = _get_mode_config(self.mode) + self.enabled = config["enabled"] + self.rate_limit_window = config["rate_limit_window"] + self.rate_limit_max_requests = config["rate_limit_max_requests"] + self.max_tracked_ips = config["max_tracked_ips"] + self.check_user_agent = config["check_user_agent"] + self.check_asset_scanner = config["check_asset_scanner"] + self.check_rate_limit = config["check_rate_limit"] + self.block_on_detect = config["block_on_detect"] # 是否阻止检测到的恶意访问 + + # 用于存储每个IP的请求时间戳 + self.request_times: dict[str, list[float]] = defaultdict(list) + # 上次清理时间 + self.last_cleanup = time.time() + # 将关键词列表转换为集合以提高查找性能 + self.crawler_keywords_set = set(CRAWLER_USER_AGENTS) + self.scanner_keywords_set = set(ASSET_SCANNER_USER_AGENTS) + self.suspicious_patterns_set = set(SUSPICIOUS_HEADER_PATTERNS) + + def _is_crawler_user_agent(self, user_agent: Optional[str]) -> bool: + """ + 检测是否为爬虫 User-Agent + + Args: + user_agent: User-Agent 字符串 + + Returns: + 如果是爬虫则返回 True + """ + if not user_agent: + # 没有 User-Agent 的请求记录日志但不直接阻止 + # 改为只记录,让频率限制来处理 + logger.debug("请求缺少User-Agent") + return False # 不再直接阻止无User-Agent的请求 + + user_agent_lower = user_agent.lower() + + # 使用集合查找提高性能(检查是否包含爬虫关键词) + for crawler_keyword in self.crawler_keywords_set: + if crawler_keyword in user_agent_lower: + return True + + return False + + def _is_asset_scanner_user_agent(self, user_agent: Optional[str]) -> bool: + """ + 检测是否为资产测绘工具 User-Agent + + Args: + user_agent: User-Agent 字符串 + + Returns: + 如果是资产测绘工具则返回 True + """ + if not user_agent: + return False + + user_agent_lower = user_agent.lower() + + # 检查是否包含资产测绘工具关键词 + for scanner_keyword in ASSET_SCANNER_USER_AGENTS: + if scanner_keyword in user_agent_lower: + return True + + return False + + def _is_asset_scanner_header(self, request: Request) -> bool: + """ + 检测是否为资产测绘工具的HTTP头 + + Args: + request: 请求对象 + + Returns: + 如果检测到资产测绘工具头则返回 True + """ + # 检查所有HTTP头 + for header_name, header_value in request.headers.items(): + header_name_lower = header_name.lower() + header_value_lower = header_value.lower() if header_value else "" + + # 检查已知的扫描工具头 + if header_name_lower in ASSET_SCANNER_HEADERS: + # 如果该头有特定的工具集合,检查值是否匹配 + expected_tools = ASSET_SCANNER_HEADERS[header_name_lower] + if expected_tools: + for tool in expected_tools: + if tool in header_value_lower: + return True + else: + # 如果没有特定工具集合,只要存在该头就视为可疑 + if header_value_lower: + return True + + # 使用集合查找提高性能(检查头值中是否包含可疑模式) + for pattern in self.suspicious_patterns_set: + if pattern in header_name_lower or pattern in header_value_lower: + return True + + return False + + def _detect_asset_scanner(self, request: Request) -> tuple[bool, Optional[str]]: + """ + 检测资产测绘工具 + + Args: + request: 请求对象 + + Returns: + (是否检测到, 检测到的工具名称) + """ + user_agent = request.headers.get("User-Agent") + + # 检查 User-Agent(使用集合查找提高性能) + if user_agent: + user_agent_lower = user_agent.lower() + for scanner_keyword in self.scanner_keywords_set: + if scanner_keyword in user_agent_lower: + return True, scanner_keyword + + # 检查HTTP头 + if self._is_asset_scanner_header(request): + # 尝试从User-Agent或头中提取工具名称 + detected_tool = None + if user_agent: + user_agent_lower = user_agent.lower() + for tool in self.scanner_keywords_set: + if tool in user_agent_lower: + detected_tool = tool + break + + # 检查HTTP头中的工具标识 + if not detected_tool: + for header_name, header_value in request.headers.items(): + header_value_lower = (header_value or "").lower() + for tool in self.scanner_keywords_set: + if tool in header_value_lower: + detected_tool = tool + break + if detected_tool: + break + + return True, detected_tool or "unknown_scanner" + + return False, None + + def _check_rate_limit(self, client_ip: str) -> bool: + """ + 检查请求频率限制 + + Args: + client_ip: 客户端IP地址 + + Returns: + 如果超过限制则返回 True(需要阻止) + """ + # 检查IP白名单 + if self._is_ip_allowed(client_ip): + return False + + # 限制跟踪的IP数量,防止内存泄漏 + if self.max_tracked_ips > 0 and len(self.request_times) > self.max_tracked_ips: + # 清理最旧的记录 + self._cleanup_old_requests(time.time()) + + current_time = time.time() + + # 定期清理过期的请求记录(每5分钟清理一次) + if current_time - self.last_cleanup > 300: + self._cleanup_old_requests(current_time) + self.last_cleanup = current_time + + # 获取该IP的请求时间列表 + request_times = self.request_times[client_ip] + + # 移除时间窗口外的请求记录 + request_times[:] = [ + req_time + for req_time in request_times + if current_time - req_time < self.rate_limit_window + ] + + # 检查是否超过限制 + if len(request_times) >= self.rate_limit_max_requests: + return True + + # 记录当前请求时间 + request_times.append(current_time) + return False + + def _cleanup_old_requests(self, current_time: float): + """清理过期的请求记录""" + for ip in list(self.request_times.keys()): + self.request_times[ip] = [ + req_time + for req_time in self.request_times[ip] + if current_time - req_time < self.rate_limit_window + ] + # 如果列表为空,删除该IP的记录 + if not self.request_times[ip]: + del self.request_times[ip] + + def _get_client_ip(self, request: Request) -> str: + """ + 获取客户端真实IP地址(带基本验证) + + Args: + request: 请求对象 + + Returns: + 客户端IP地址 + """ + # 优先从 X-Forwarded-For 获取(适用于反向代理) + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + # X-Forwarded-For 可能包含多个IP,取第一个 + ip = forwarded_for.split(",")[0].strip() + # 基本验证IP格式 + if self._validate_ip(ip): + return ip + + # 从 X-Real-IP 获取 + real_ip = request.headers.get("X-Real-IP") + if real_ip: + ip = real_ip.strip() + if self._validate_ip(ip): + return ip + + # 使用客户端IP + if request.client: + ip = request.client.host + if self._validate_ip(ip): + return ip + + return "unknown" + + def _validate_ip(self, ip: str) -> bool: + """ + 验证IP地址格式 + + Args: + ip: IP地址字符串 + + Returns: + 如果格式有效则返回 True + """ + try: + ipaddress.ip_address(ip) + return True + except (ValueError, AttributeError): + return False + + def _is_ip_allowed(self, ip: str) -> bool: + """ + 检查IP是否在白名单中(支持精确IP、CIDR格式和通配符) + + Args: + ip: 客户端IP地址 + + Returns: + 如果IP在白名单中则返回 True + """ + if not ALLOWED_IPS or ip == "unknown": + return False + + # 检查白名单中的每个条目 + for allowed_entry in ALLOWED_IPS: + # 通配符模式(字符串,正则表达式) + if isinstance(allowed_entry, str): + try: + if re.match(allowed_entry, ip): + return True + except re.error: + # 正则表达式错误,跳过 + continue + # CIDR格式(网络对象) + elif isinstance(allowed_entry, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj in allowed_entry: + return True + except (ValueError, AttributeError): + # IP格式无效,跳过 + continue + # 精确IP(地址对象) + elif isinstance(allowed_entry, (ipaddress.IPv4Address, ipaddress.IPv6Address)): + try: + client_ip_obj = ipaddress.ip_address(ip) + if client_ip_obj == allowed_entry: + return True + except (ValueError, AttributeError): + # IP格式无效,跳过 + continue + + return False + + async def dispatch(self, request: Request, call_next): + """ + 处理请求 + + Args: + request: 请求对象 + call_next: 下一个中间件或路由处理函数 + + Returns: + 响应对象 + """ + # 如果未启用,直接通过 + if not self.enabled: + return await call_next(request) + + # 允许访问 robots.txt(由专门的路由处理) + if request.url.path == "/robots.txt": + return await call_next(request) + + # 允许访问静态资源(CSS、JS、图片等) + static_extensions = {".css", ".js", ".json", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".woff", ".woff2", ".ttf", ".eot"} + if any(request.url.path.endswith(ext) for ext in static_extensions): + return await call_next(request) + + # 获取客户端IP(只获取一次,避免重复调用) + client_ip = self._get_client_ip(request) + + # 检查IP白名单(优先检查,白名单IP直接通过) + if self._is_ip_allowed(client_ip): + return await call_next(request) + + # 获取 User-Agent + user_agent = request.headers.get("User-Agent") + + # 检测资产测绘工具(优先检测,因为更危险) + if self.check_asset_scanner: + is_scanner, scanner_name = self._detect_asset_scanner(request) + if is_scanner: + logger.warning( + f"🚫 检测到资产测绘工具请求 - IP: {client_ip}, 工具: {scanner_name}, " + f"User-Agent: {user_agent}, Path: {request.url.path}" + ) + # 根据配置决定是否阻止 + if self.block_on_detect: + return PlainTextResponse( + "Access Denied: Asset scanning tools are not allowed", + status_code=403, + ) + + # 检测爬虫 User-Agent + if self.check_user_agent and self._is_crawler_user_agent(user_agent): + logger.warning( + f"🚫 检测到爬虫请求 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}" + ) + # 根据配置决定是否阻止 + if self.block_on_detect: + return PlainTextResponse( + "Access Denied: Crawlers are not allowed", + status_code=403, + ) + + # 检查请求频率限制 + if self.check_rate_limit and self._check_rate_limit(client_ip): + logger.warning( + f"🚫 请求频率过高 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}" + ) + return PlainTextResponse( + "Too Many Requests: Rate limit exceeded", + status_code=429, + ) + + # 正常请求,继续处理 + return await call_next(request) + + +def create_robots_txt_response() -> PlainTextResponse: + """ + 创建 robots.txt 响应 + + Returns: + robots.txt 响应对象 + """ + robots_content = """User-agent: * +Disallow: / + +# 禁止所有爬虫访问 +""" + return PlainTextResponse( + content=robots_content, + media_type="text/plain", + headers={"Cache-Control": "public, max-age=86400"}, # 缓存24小时 + ) + diff --git a/src/webui/webui_server.py b/src/webui/webui_server.py index ac95e80c..afe17fd8 100644 --- a/src/webui/webui_server.py +++ b/src/webui/webui_server.py @@ -21,12 +21,18 @@ class WebUIServer: self.app = FastAPI(title="MaiBot WebUI") self._server = None + # 配置防爬虫中间件(需要在CORS之前注册) + self._setup_anti_crawler() + # 显示 Access Token self._show_access_token() # 重要:先注册 API 路由,再设置静态文件 self._register_api_routes() self._setup_static_files() + + # 注册robots.txt路由 + self._setup_robots_txt() def _show_access_token(self): """显示 WebUI Access Token""" @@ -82,6 +88,46 @@ class WebUIServer: logger.info(f"✅ WebUI 静态文件服务已配置: {static_path}") + def _setup_anti_crawler(self): + """配置防爬虫中间件""" + try: + from src.webui.anti_crawler import AntiCrawlerMiddleware + + # 从环境变量读取防爬虫模式(false/strict/loose/standard) + anti_crawler_mode = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "standard").lower() + + # 注意:中间件按注册顺序反向执行,所以先注册的中间件后执行 + # 我们需要在CORS之前注册,这样防爬虫检查会在CORS之前执行 + self.app.add_middleware( + AntiCrawlerMiddleware, + mode=anti_crawler_mode + ) + + mode_descriptions = { + "false": "已禁用", + "strict": "严格模式", + "loose": "宽松模式", + "standard": "标准模式" + } + mode_desc = mode_descriptions.get(anti_crawler_mode, "标准模式") + logger.info(f"🛡️ 防爬虫中间件已配置: {mode_desc}") + except Exception as e: + logger.error(f"❌ 配置防爬虫中间件失败: {e}", exc_info=True) + + def _setup_robots_txt(self): + """设置robots.txt路由""" + try: + from src.webui.anti_crawler import create_robots_txt_response + + @self.app.get("/robots.txt", include_in_schema=False) + async def robots_txt(): + """返回robots.txt,禁止所有爬虫""" + return create_robots_txt_response() + + logger.debug("✅ robots.txt 路由已注册") + except Exception as e: + logger.error(f"❌ 注册robots.txt路由失败: {e}", exc_info=True) + def _register_api_routes(self): """注册所有 WebUI API 路由""" try: diff --git a/template/template.env b/template/template.env index b6dd0e5c..a08635fb 100644 --- a/template/template.env +++ b/template/template.env @@ -6,4 +6,10 @@ PORT=8000 WEBUI_ENABLED=true WEBUI_MODE=production # 模式: development(开发) 或 production(生产) WEBUI_HOST=0.0.0.0 # WebUI 服务器监听地址 -WEBUI_PORT=8001 # WebUI 服务器端口 \ No newline at end of file +WEBUI_PORT=8001 # WebUI 服务器端口 + +# 防爬虫配置 +WEBUI_ANTI_CRAWLER_MODE=basic # 防爬虫模式: false(禁用) / strict(严格) / loose(宽松) / basic(基础-只记录不阻止) +WEBUI_ALLOWED_IPS=127.0.0.1 # IP白名单(逗号分隔,支持精确IP、CIDR格式和通配符) + # 示例: 127.0.0.1,192.168.1.0/24,172.17.0.0/16 + # 注意: 不要使用 *.*.*.* 或 *,这会导致防爬虫功能完全失效 \ No newline at end of file