feat:为 WebUI 添加防爬虫与资产测绘检测,支持多种防护模式与 IP 白名单,资源占用低。

新增功能:
防爬虫检测:检测 20+ 种爬虫和 30+ 种资产测绘工具(Shodan、Censys、Nmap 等)
多种模式:basic(只记录,默认)、strict(严格)、loose(宽松)、false(禁用)
IP 白名单:支持精确 IP、CIDR、通配符格式,白名单 IP 豁免所有检测
请求频率限制:基于 IP 的滑动时间窗口,防止高频请求
pull/1439/head
陈曦 2025-12-14 19:21:02 +08:00
parent fa4555197d
commit 32af5ae29a
3 changed files with 754 additions and 1 deletions

View File

@ -0,0 +1,701 @@
"""
WebUI 防爬虫模块
提供爬虫检测和阻止功能保护 WebUI 不被搜索引擎和恶意爬虫访问
"""
import os
import time
import ipaddress
import re
from collections import defaultdict
from typing import Optional, Union
from functools import lru_cache
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, PlainTextResponse
from fastapi import HTTPException
from src.common.logger import get_logger
logger = get_logger("webui.anti_crawler")
# 常见爬虫 User-Agent 列表(使用更精确的关键词,避免误报)
CRAWLER_USER_AGENTS = {
# 搜索引擎爬虫(精确匹配)
"googlebot",
"bingbot",
"baiduspider",
"yandexbot",
"slurp", # Yahoo
"duckduckbot",
"sogou",
"exabot",
"facebot",
"ia_archiver", # Internet Archive
# 通用爬虫(移除过于宽泛的关键词)
"crawler",
"spider",
"scraper",
"wget", # 保留wget因为通常用于自动化脚本
"scrapy", # 保留scrapy因为这是爬虫框架
# 安全扫描工具(这些是明确的扫描工具)
"masscan",
"nmap",
"nikto",
"sqlmap",
# 注意:移除了以下过于宽泛的关键词以避免误报:
# - "bot" (会误匹配GitHub-Robot等)
# - "curl" (正常工具)
# - "python-requests" (正常库)
# - "httpx" (正常库)
# - "aiohttp" (正常库)
}
# 资产测绘工具 User-Agent 标识
ASSET_SCANNER_USER_AGENTS = {
# 知名资产测绘平台
"shodan",
"censys",
"zoomeye",
"fofa",
"quake",
"hunter",
"binaryedge",
"onyphe",
"securitytrails",
"virustotal",
"passivetotal",
# 安全扫描工具
"acunetix",
"appscan",
"burpsuite",
"nessus",
"openvas",
"qualys",
"rapid7",
"tenable",
"veracode",
"zap",
"awvs", # Acunetix Web Vulnerability Scanner
"netsparker",
"skipfish",
"w3af",
"arachni",
# 其他扫描工具
"masscan",
"zmap",
"nmap",
"whatweb",
"wpscan",
"joomscan",
"dnsenum",
"subfinder",
"amass",
"sublist3r",
"theharvester",
}
# 资产测绘工具常用的HTTP头标识
ASSET_SCANNER_HEADERS = {
# 常见的扫描工具自定义头
"x-scan": {"shodan", "censys", "zoomeye", "fofa"},
"x-scanner": {"nmap", "masscan", "zmap"},
"x-probe": {"masscan", "zmap"},
# 其他可疑头(移除反向代理标准头)
"x-originating-ip": set(),
"x-remote-ip": set(),
"x-remote-addr": set(),
# 注意:移除了以下反向代理标准头以避免误报:
# - "x-forwarded-proto" (反向代理标准头)
# - "x-real-ip" (反向代理标准头已在_get_client_ip中使用)
}
# 可疑的HTTP头值模式用于检测扫描工具
SUSPICIOUS_HEADER_PATTERNS = {
"shodan",
"censys",
"zoomeye",
"fofa",
"quake",
"scanner",
"probe",
"scan",
"recon",
"reconnaissance",
}
# 防爬虫模式配置
# false: 禁用
# strict: 严格模式(更严格的检测,更低的频率限制)
# loose: 宽松模式(较宽松的检测,较高的频率限制)
# basic: 基础模式只记录恶意访问不阻止不限制请求数不跟踪IP
ANTI_CRAWLER_MODE = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "basic").lower()
# IP白名单配置从环境变量读取逗号分隔
# 支持格式:
# - 精确IP127.0.0.1, 192.168.1.100
# - CIDR格式192.168.1.0/24, 172.17.0.0/16 (适用于Docker网络)
# - 通配符192.168.*.*, 10.*.*.*, *.*.*.* (匹配所有)
# - IPv6::1, 2001:db8::/32
def _parse_allowed_ips(ip_string: str) -> list:
"""
解析IP白名单字符串支持精确IPCIDR格式和通配符
Args:
ip_string: 逗号分隔的IP字符串
Returns:
IP白名单列表每个元素可能是
- ipaddress.IPv4Network/IPv6Network对象CIDR格式
- ipaddress.IPv4Address/IPv6Address对象精确IP
- str通配符模式已转换为正则表达式
"""
allowed = []
if not ip_string:
return allowed
for ip_entry in ip_string.split(","):
ip_entry = ip_entry.strip() # 去除空格
if not ip_entry:
continue
# 检查通配符格式(包含*
if "*" in ip_entry:
# 处理通配符
pattern = _convert_wildcard_to_regex(ip_entry)
if pattern:
allowed.append(pattern)
else:
logger.warning(f"无效的通配符IP格式已忽略: {ip_entry}")
continue
try:
# 尝试解析为CIDR格式包含/
if "/" in ip_entry:
allowed.append(ipaddress.ip_network(ip_entry, strict=False))
else:
# 精确IP地址
allowed.append(ipaddress.ip_address(ip_entry))
except (ValueError, AttributeError) as e:
logger.warning(f"无效的IP白名单条目已忽略: {ip_entry} ({e})")
return allowed
def _convert_wildcard_to_regex(wildcard_pattern: str) -> Optional[str]:
"""
将通配符IP模式转换为正则表达式
支持的格式
- 192.168.*.* 192.168.*
- 10.*.*.* 10.*
- *.*.*.* *
Args:
wildcard_pattern: 通配符模式字符串
Returns:
正则表达式字符串如果格式无效则返回None
"""
# 去除空格
pattern = wildcard_pattern.strip()
# 处理单个*(匹配所有)
if pattern == "*":
return r".*"
# 处理IPv4通配符格式
# 支持192.168.*.*, 192.168.*, 10.*.*.*, 10.* 等
parts = pattern.split(".")
if len(parts) > 4:
return None # IPv4最多4段
# 构建正则表达式
regex_parts = []
for part in parts:
part = part.strip()
if part == "*":
regex_parts.append(r"\d+") # 匹配任意数字
elif part.isdigit():
# 验证数字范围0-255
num = int(part)
if 0 <= num <= 255:
regex_parts.append(re.escape(part))
else:
return None # 无效的数字
else:
return None # 无效的格式
# 如果部分少于4段补充.*
while len(regex_parts) < 4:
regex_parts.append(r"\d+")
# 组合成正则表达式
regex = r"^" + r"\.".join(regex_parts) + r"$"
return regex
ALLOWED_IPS = _parse_allowed_ips(os.getenv("WEBUI_ALLOWED_IPS", ""))
def _get_mode_config(mode: str) -> dict:
"""
根据模式获取配置参数
Args:
mode: 防爬虫模式 (false/strict/loose/basic)
Returns:
配置字典包含所有相关参数
"""
mode = mode.lower()
if mode == "false":
return {
"enabled": False,
"rate_limit_window": 60,
"rate_limit_max_requests": 1000, # 禁用时设置很高的值
"max_tracked_ips": 0,
"check_user_agent": False,
"check_asset_scanner": False,
"check_rate_limit": False,
"block_on_detect": False, # 不阻止
}
elif mode == "strict":
return {
"enabled": True,
"rate_limit_window": 60,
"rate_limit_max_requests": 15, # 严格模式:更低的请求数
"max_tracked_ips": 20000,
"check_user_agent": True,
"check_asset_scanner": True,
"check_rate_limit": True,
"block_on_detect": True, # 阻止恶意访问
}
elif mode == "loose":
return {
"enabled": True,
"rate_limit_window": 60,
"rate_limit_max_requests": 60, # 宽松模式:更高的请求数
"max_tracked_ips": 5000,
"check_user_agent": True,
"check_asset_scanner": True,
"check_rate_limit": True,
"block_on_detect": True, # 阻止恶意访问
}
else: # basic (默认模式)
return {
"enabled": True,
"rate_limit_window": 60,
"rate_limit_max_requests": 1000, # 不限制请求数
"max_tracked_ips": 0, # 不跟踪IP
"check_user_agent": True, # 检测但不阻止
"check_asset_scanner": True, # 检测但不阻止
"check_rate_limit": False, # 不限制请求频率
"block_on_detect": False, # 只记录,不阻止
}
class AntiCrawlerMiddleware(BaseHTTPMiddleware):
"""防爬虫中间件"""
def __init__(self, app, mode: str = "standard"):
"""
初始化防爬虫中间件
Args:
app: FastAPI 应用实例
mode: 防爬虫模式 (false/strict/loose/standard)
"""
super().__init__(app)
self.mode = mode.lower()
# 根据模式获取配置
config = _get_mode_config(self.mode)
self.enabled = config["enabled"]
self.rate_limit_window = config["rate_limit_window"]
self.rate_limit_max_requests = config["rate_limit_max_requests"]
self.max_tracked_ips = config["max_tracked_ips"]
self.check_user_agent = config["check_user_agent"]
self.check_asset_scanner = config["check_asset_scanner"]
self.check_rate_limit = config["check_rate_limit"]
self.block_on_detect = config["block_on_detect"] # 是否阻止检测到的恶意访问
# 用于存储每个IP的请求时间戳
self.request_times: dict[str, list[float]] = defaultdict(list)
# 上次清理时间
self.last_cleanup = time.time()
# 将关键词列表转换为集合以提高查找性能
self.crawler_keywords_set = set(CRAWLER_USER_AGENTS)
self.scanner_keywords_set = set(ASSET_SCANNER_USER_AGENTS)
self.suspicious_patterns_set = set(SUSPICIOUS_HEADER_PATTERNS)
def _is_crawler_user_agent(self, user_agent: Optional[str]) -> bool:
"""
检测是否为爬虫 User-Agent
Args:
user_agent: User-Agent 字符串
Returns:
如果是爬虫则返回 True
"""
if not user_agent:
# 没有 User-Agent 的请求记录日志但不直接阻止
# 改为只记录,让频率限制来处理
logger.debug("请求缺少User-Agent")
return False # 不再直接阻止无User-Agent的请求
user_agent_lower = user_agent.lower()
# 使用集合查找提高性能(检查是否包含爬虫关键词)
for crawler_keyword in self.crawler_keywords_set:
if crawler_keyword in user_agent_lower:
return True
return False
def _is_asset_scanner_user_agent(self, user_agent: Optional[str]) -> bool:
"""
检测是否为资产测绘工具 User-Agent
Args:
user_agent: User-Agent 字符串
Returns:
如果是资产测绘工具则返回 True
"""
if not user_agent:
return False
user_agent_lower = user_agent.lower()
# 检查是否包含资产测绘工具关键词
for scanner_keyword in ASSET_SCANNER_USER_AGENTS:
if scanner_keyword in user_agent_lower:
return True
return False
def _is_asset_scanner_header(self, request: Request) -> bool:
"""
检测是否为资产测绘工具的HTTP头
Args:
request: 请求对象
Returns:
如果检测到资产测绘工具头则返回 True
"""
# 检查所有HTTP头
for header_name, header_value in request.headers.items():
header_name_lower = header_name.lower()
header_value_lower = header_value.lower() if header_value else ""
# 检查已知的扫描工具头
if header_name_lower in ASSET_SCANNER_HEADERS:
# 如果该头有特定的工具集合,检查值是否匹配
expected_tools = ASSET_SCANNER_HEADERS[header_name_lower]
if expected_tools:
for tool in expected_tools:
if tool in header_value_lower:
return True
else:
# 如果没有特定工具集合,只要存在该头就视为可疑
if header_value_lower:
return True
# 使用集合查找提高性能(检查头值中是否包含可疑模式)
for pattern in self.suspicious_patterns_set:
if pattern in header_name_lower or pattern in header_value_lower:
return True
return False
def _detect_asset_scanner(self, request: Request) -> tuple[bool, Optional[str]]:
"""
检测资产测绘工具
Args:
request: 请求对象
Returns:
(是否检测到, 检测到的工具名称)
"""
user_agent = request.headers.get("User-Agent")
# 检查 User-Agent使用集合查找提高性能
if user_agent:
user_agent_lower = user_agent.lower()
for scanner_keyword in self.scanner_keywords_set:
if scanner_keyword in user_agent_lower:
return True, scanner_keyword
# 检查HTTP头
if self._is_asset_scanner_header(request):
# 尝试从User-Agent或头中提取工具名称
detected_tool = None
if user_agent:
user_agent_lower = user_agent.lower()
for tool in self.scanner_keywords_set:
if tool in user_agent_lower:
detected_tool = tool
break
# 检查HTTP头中的工具标识
if not detected_tool:
for header_name, header_value in request.headers.items():
header_value_lower = (header_value or "").lower()
for tool in self.scanner_keywords_set:
if tool in header_value_lower:
detected_tool = tool
break
if detected_tool:
break
return True, detected_tool or "unknown_scanner"
return False, None
def _check_rate_limit(self, client_ip: str) -> bool:
"""
检查请求频率限制
Args:
client_ip: 客户端IP地址
Returns:
如果超过限制则返回 True需要阻止
"""
# 检查IP白名单
if self._is_ip_allowed(client_ip):
return False
# 限制跟踪的IP数量防止内存泄漏
if self.max_tracked_ips > 0 and len(self.request_times) > self.max_tracked_ips:
# 清理最旧的记录
self._cleanup_old_requests(time.time())
current_time = time.time()
# 定期清理过期的请求记录每5分钟清理一次
if current_time - self.last_cleanup > 300:
self._cleanup_old_requests(current_time)
self.last_cleanup = current_time
# 获取该IP的请求时间列表
request_times = self.request_times[client_ip]
# 移除时间窗口外的请求记录
request_times[:] = [
req_time
for req_time in request_times
if current_time - req_time < self.rate_limit_window
]
# 检查是否超过限制
if len(request_times) >= self.rate_limit_max_requests:
return True
# 记录当前请求时间
request_times.append(current_time)
return False
def _cleanup_old_requests(self, current_time: float):
"""清理过期的请求记录"""
for ip in list(self.request_times.keys()):
self.request_times[ip] = [
req_time
for req_time in self.request_times[ip]
if current_time - req_time < self.rate_limit_window
]
# 如果列表为空删除该IP的记录
if not self.request_times[ip]:
del self.request_times[ip]
def _get_client_ip(self, request: Request) -> str:
"""
获取客户端真实IP地址带基本验证
Args:
request: 请求对象
Returns:
客户端IP地址
"""
# 优先从 X-Forwarded-For 获取(适用于反向代理)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
# X-Forwarded-For 可能包含多个IP取第一个
ip = forwarded_for.split(",")[0].strip()
# 基本验证IP格式
if self._validate_ip(ip):
return ip
# 从 X-Real-IP 获取
real_ip = request.headers.get("X-Real-IP")
if real_ip:
ip = real_ip.strip()
if self._validate_ip(ip):
return ip
# 使用客户端IP
if request.client:
ip = request.client.host
if self._validate_ip(ip):
return ip
return "unknown"
def _validate_ip(self, ip: str) -> bool:
"""
验证IP地址格式
Args:
ip: IP地址字符串
Returns:
如果格式有效则返回 True
"""
try:
ipaddress.ip_address(ip)
return True
except (ValueError, AttributeError):
return False
def _is_ip_allowed(self, ip: str) -> bool:
"""
检查IP是否在白名单中支持精确IPCIDR格式和通配符
Args:
ip: 客户端IP地址
Returns:
如果IP在白名单中则返回 True
"""
if not ALLOWED_IPS or ip == "unknown":
return False
# 检查白名单中的每个条目
for allowed_entry in ALLOWED_IPS:
# 通配符模式(字符串,正则表达式)
if isinstance(allowed_entry, str):
try:
if re.match(allowed_entry, ip):
return True
except re.error:
# 正则表达式错误,跳过
continue
# CIDR格式网络对象
elif isinstance(allowed_entry, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
try:
client_ip_obj = ipaddress.ip_address(ip)
if client_ip_obj in allowed_entry:
return True
except (ValueError, AttributeError):
# IP格式无效跳过
continue
# 精确IP地址对象
elif isinstance(allowed_entry, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
try:
client_ip_obj = ipaddress.ip_address(ip)
if client_ip_obj == allowed_entry:
return True
except (ValueError, AttributeError):
# IP格式无效跳过
continue
return False
async def dispatch(self, request: Request, call_next):
"""
处理请求
Args:
request: 请求对象
call_next: 下一个中间件或路由处理函数
Returns:
响应对象
"""
# 如果未启用,直接通过
if not self.enabled:
return await call_next(request)
# 允许访问 robots.txt由专门的路由处理
if request.url.path == "/robots.txt":
return await call_next(request)
# 允许访问静态资源CSS、JS、图片等
static_extensions = {".css", ".js", ".json", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".woff", ".woff2", ".ttf", ".eot"}
if any(request.url.path.endswith(ext) for ext in static_extensions):
return await call_next(request)
# 获取客户端IP只获取一次避免重复调用
client_ip = self._get_client_ip(request)
# 检查IP白名单优先检查白名单IP直接通过
if self._is_ip_allowed(client_ip):
return await call_next(request)
# 获取 User-Agent
user_agent = request.headers.get("User-Agent")
# 检测资产测绘工具(优先检测,因为更危险)
if self.check_asset_scanner:
is_scanner, scanner_name = self._detect_asset_scanner(request)
if is_scanner:
logger.warning(
f"🚫 检测到资产测绘工具请求 - IP: {client_ip}, 工具: {scanner_name}, "
f"User-Agent: {user_agent}, Path: {request.url.path}"
)
# 根据配置决定是否阻止
if self.block_on_detect:
return PlainTextResponse(
"Access Denied: Asset scanning tools are not allowed",
status_code=403,
)
# 检测爬虫 User-Agent
if self.check_user_agent and self._is_crawler_user_agent(user_agent):
logger.warning(
f"🚫 检测到爬虫请求 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}"
)
# 根据配置决定是否阻止
if self.block_on_detect:
return PlainTextResponse(
"Access Denied: Crawlers are not allowed",
status_code=403,
)
# 检查请求频率限制
if self.check_rate_limit and self._check_rate_limit(client_ip):
logger.warning(
f"🚫 请求频率过高 - IP: {client_ip}, User-Agent: {user_agent}, Path: {request.url.path}"
)
return PlainTextResponse(
"Too Many Requests: Rate limit exceeded",
status_code=429,
)
# 正常请求,继续处理
return await call_next(request)
def create_robots_txt_response() -> PlainTextResponse:
"""
创建 robots.txt 响应
Returns:
robots.txt 响应对象
"""
robots_content = """User-agent: *
Disallow: /
# 禁止所有爬虫访问
"""
return PlainTextResponse(
content=robots_content,
media_type="text/plain",
headers={"Cache-Control": "public, max-age=86400"}, # 缓存24小时
)

View File

@ -21,12 +21,18 @@ class WebUIServer:
self.app = FastAPI(title="MaiBot WebUI")
self._server = None
# 配置防爬虫中间件需要在CORS之前注册
self._setup_anti_crawler()
# 显示 Access Token
self._show_access_token()
# 重要:先注册 API 路由,再设置静态文件
self._register_api_routes()
self._setup_static_files()
# 注册robots.txt路由
self._setup_robots_txt()
def _show_access_token(self):
"""显示 WebUI Access Token"""
@ -82,6 +88,46 @@ class WebUIServer:
logger.info(f"✅ WebUI 静态文件服务已配置: {static_path}")
def _setup_anti_crawler(self):
"""配置防爬虫中间件"""
try:
from src.webui.anti_crawler import AntiCrawlerMiddleware
# 从环境变量读取防爬虫模式false/strict/loose/standard
anti_crawler_mode = os.getenv("WEBUI_ANTI_CRAWLER_MODE", "standard").lower()
# 注意:中间件按注册顺序反向执行,所以先注册的中间件后执行
# 我们需要在CORS之前注册这样防爬虫检查会在CORS之前执行
self.app.add_middleware(
AntiCrawlerMiddleware,
mode=anti_crawler_mode
)
mode_descriptions = {
"false": "已禁用",
"strict": "严格模式",
"loose": "宽松模式",
"standard": "标准模式"
}
mode_desc = mode_descriptions.get(anti_crawler_mode, "标准模式")
logger.info(f"🛡️ 防爬虫中间件已配置: {mode_desc}")
except Exception as e:
logger.error(f"❌ 配置防爬虫中间件失败: {e}", exc_info=True)
def _setup_robots_txt(self):
"""设置robots.txt路由"""
try:
from src.webui.anti_crawler import create_robots_txt_response
@self.app.get("/robots.txt", include_in_schema=False)
async def robots_txt():
"""返回robots.txt禁止所有爬虫"""
return create_robots_txt_response()
logger.debug("✅ robots.txt 路由已注册")
except Exception as e:
logger.error(f"❌ 注册robots.txt路由失败: {e}", exc_info=True)
def _register_api_routes(self):
"""注册所有 WebUI API 路由"""
try:

View File

@ -6,4 +6,10 @@ PORT=8000
WEBUI_ENABLED=true
WEBUI_MODE=production # 模式: development(开发) 或 production(生产)
WEBUI_HOST=0.0.0.0 # WebUI 服务器监听地址
WEBUI_PORT=8001 # WebUI 服务器端口
WEBUI_PORT=8001 # WebUI 服务器端口
# 防爬虫配置
WEBUI_ANTI_CRAWLER_MODE=basic # 防爬虫模式: false(禁用) / strict(严格) / loose(宽松) / basic(基础-只记录不阻止)
WEBUI_ALLOWED_IPS=127.0.0.1 # IP白名单逗号分隔支持精确IP、CIDR格式和通配符
# 示例: 127.0.0.1,192.168.1.0/24,172.17.0.0/16
# 注意: 不要使用 *.*.*.* 或 *,这会导致防爬虫功能完全失效