feat: 添加 WebSocket 日志推送功能,支持实时日志监控和历史日志加载

pull/1364/head
墨梓柒 2025-11-18 00:38:30 +08:00
parent b0eb77e68e
commit 43e0c9a4ad
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
6 changed files with 257 additions and 1 deletions

4
bot.py
View File

@ -212,6 +212,10 @@ if __name__ == "__main__":
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 初始化 WebSocket 日志推送
from src.common.logger import initialize_ws_handler
initialize_ws_handler(loop)
try:
# 执行初始化和任务调度

View File

@ -19,6 +19,7 @@ PROJECT_ROOT = logger_file.parent.parent.parent.resolve()
# 全局handler实例避免重复创建
_file_handler = None
_console_handler = None
_ws_handler = None
def get_file_handler():
@ -59,6 +60,35 @@ def get_console_handler():
return _console_handler
def get_ws_handler():
"""获取 WebSocket handler 单例"""
global _ws_handler
if _ws_handler is None:
_ws_handler = WebSocketLogHandler()
# WebSocket handler 只推送 INFO 及以上级别
_ws_handler.setLevel(logging.INFO)
return _ws_handler
def initialize_ws_handler(loop):
"""初始化 WebSocket handler 的事件循环
Args:
loop: asyncio 事件循环
"""
handler = get_ws_handler()
handler.set_loop(loop)
# 为 WebSocket handler 设置 JSON 格式化器(与文件格式相同)
handler.setFormatter(file_formatter)
# 添加到根日志记录器
root_logger = logging.getLogger()
if handler not in root_logger.handlers:
root_logger.addHandler(handler)
print("[日志系统] ✅ WebSocket 日志推送已启用")
class TimestampedFileHandler(logging.Handler):
"""基于时间戳的文件处理器,简单的轮转份数限制"""
@ -145,12 +175,78 @@ class TimestampedFileHandler(logging.Handler):
super().close()
class WebSocketLogHandler(logging.Handler):
"""WebSocket 日志处理器 - 将日志实时推送到前端"""
_log_counter = 0 # 类级别计数器,确保 ID 唯一性
def __init__(self, loop=None):
super().__init__()
self.loop = loop
self._initialized = False
def set_loop(self, loop):
"""设置事件循环"""
self.loop = loop
self._initialized = True
def emit(self, record):
"""发送日志到 WebSocket 客户端"""
if not self._initialized or self.loop is None:
return
try:
# 获取格式化后的消息
# 对于 structlog,formatted message 包含完整的日志信息
formatted_msg = self.format(record) if self.formatter else record.getMessage()
# 如果是 JSON 格式(文件格式化器),解析它
message = formatted_msg
try:
import json
log_dict = json.loads(formatted_msg)
message = log_dict.get('event', formatted_msg)
except (json.JSONDecodeError, ValueError):
# 不是 JSON,直接使用消息
message = formatted_msg
# 生成唯一 ID: 时间戳毫秒 + 自增计数器
WebSocketLogHandler._log_counter += 1
log_id = f"{int(record.created * 1000)}_{WebSocketLogHandler._log_counter}"
# 格式化日志数据
log_data = {
"id": log_id,
"timestamp": datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S"),
"level": record.levelname,
"module": record.name,
"message": message,
}
# 异步广播日志(不阻塞日志记录)
try:
import asyncio
from src.webui.logs_ws import broadcast_log
asyncio.run_coroutine_threadsafe(
broadcast_log(log_data),
self.loop
)
except Exception:
# WebSocket 推送失败不影响日志记录
pass
except Exception:
# 不要让 WebSocket 错误影响日志系统
self.handleError(record)
# 旧的轮转文件处理器已移除,现在使用基于时间戳的处理器
def close_handlers():
"""安全关闭所有handler"""
global _file_handler, _console_handler
global _file_handler, _console_handler, _ws_handler
if _file_handler:
_file_handler.close()
@ -159,6 +255,10 @@ def close_handlers():
if _console_handler:
_console_handler.close()
_console_handler = None
if _ws_handler:
_ws_handler.close()
_ws_handler = None
def remove_duplicate_handlers(): # sourcery skip: for-append-to-extend, list-comprehension

View File

View File

View File

@ -0,0 +1,138 @@
"""WebSocket 日志推送模块"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import Set
import json
from pathlib import Path
from src.common.logger import get_logger
logger = get_logger("webui.logs_ws")
router = APIRouter()
# 全局 WebSocket 连接池
active_connections: Set[WebSocket] = set()
def load_recent_logs(limit: int = 100) -> list[dict]:
"""从日志文件中加载最近的日志
Args:
limit: 返回的最大日志条数
Returns:
日志列表
"""
logs = []
log_dir = Path("logs")
if not log_dir.exists():
return logs
# 获取所有日志文件,按修改时间排序
log_files = sorted(log_dir.glob("app_*.log.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True)
# 用于生成唯一 ID 的计数器
log_counter = 0
# 从最新的文件开始读取
for log_file in log_files:
if len(logs) >= limit:
break
try:
with open(log_file, "r", encoding="utf-8") as f:
lines = f.readlines()
# 从文件末尾开始读取
for line in reversed(lines):
if len(logs) >= limit:
break
try:
log_entry = json.loads(line.strip())
# 转换为前端期望的格式
# 使用时间戳 + 计数器生成唯一 ID
timestamp_id = log_entry.get("timestamp", "0").replace("-", "").replace(" ", "").replace(":", "")
formatted_log = {
"id": f"{timestamp_id}_{log_counter}",
"timestamp": log_entry.get("timestamp", ""),
"level": log_entry.get("level", "INFO").upper(),
"module": log_entry.get("logger_name", ""),
"message": log_entry.get("event", ""),
}
logs.append(formatted_log)
log_counter += 1
except (json.JSONDecodeError, KeyError):
continue
except Exception as e:
logger.error(f"读取日志文件失败 {log_file}: {e}")
continue
# 反转列表,使其按时间顺序排列(旧到新)
return list(reversed(logs))
@router.websocket("/ws/logs")
async def websocket_logs(websocket: WebSocket):
"""WebSocket 日志推送端点
客户端连接后会持续接收服务器端的日志消息
"""
await websocket.accept()
active_connections.add(websocket)
logger.info(f"📡 WebSocket 客户端已连接,当前连接数: {len(active_connections)}")
# 连接建立后,立即发送历史日志
try:
recent_logs = load_recent_logs(limit=100)
logger.info(f"发送 {len(recent_logs)} 条历史日志到客户端")
for log_entry in recent_logs:
await websocket.send_text(json.dumps(log_entry, ensure_ascii=False))
except Exception as e:
logger.error(f"发送历史日志失败: {e}")
try:
# 保持连接,等待客户端消息或断开
while True:
# 接收客户端消息(用于心跳或控制指令)
data = await websocket.receive_text()
# 可以处理客户端的控制消息,例如:
# - "ping" -> 心跳检测
# - {"filter": "ERROR"} -> 设置日志级别过滤
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
active_connections.discard(websocket)
logger.info(f"📡 WebSocket 客户端已断开,当前连接数: {len(active_connections)}")
except Exception as e:
logger.error(f"❌ WebSocket 错误: {e}")
active_connections.discard(websocket)
async def broadcast_log(log_data: dict):
"""广播日志到所有连接的 WebSocket 客户端
Args:
log_data: 日志数据字典
"""
if not active_connections:
return
# 格式化为 JSON
message = json.dumps(log_data, ensure_ascii=False)
# 记录需要断开的连接
disconnected = set()
# 广播到所有客户端
for connection in active_connections:
try:
await connection.send_text(message)
except Exception:
# 发送失败,标记为断开
disconnected.add(connection)
# 清理断开的连接
if disconnected:
active_connections.difference_update(disconnected)
logger.debug(f"清理了 {len(disconnected)} 个断开的 WebSocket 连接")

View File

@ -31,6 +31,14 @@ def setup_webui(mode: str = "production") -> bool:
def setup_dev_mode() -> bool:
"""设置开发模式 - 仅启用 CORS前端自行启动"""
from src.common.server import get_global_server
from .logs_ws import router as logs_router
# 注册 WebSocket 日志路由(开发模式也需要)
server = get_global_server()
server.register_router(logs_router)
logger.info("✅ WebSocket 日志推送路由已注册")
logger.info("📝 WebUI 开发模式已启用")
logger.info("🌐 请手动启动前端开发服务器: cd webui && npm run dev")
logger.info("💡 前端将运行在 http://localhost:7999")
@ -42,6 +50,7 @@ def setup_production_mode() -> bool:
try:
from src.common.server import get_global_server
from starlette.responses import FileResponse
from .logs_ws import router as logs_router
import mimetypes
# 确保正确的 MIME 类型映射
@ -52,6 +61,11 @@ def setup_production_mode() -> bool:
mimetypes.add_type('application/json', '.json')
server = get_global_server()
# 注册 WebSocket 日志路由
server.register_router(logs_router)
logger.info("✅ WebSocket 日志推送路由已注册")
base_dir = Path(__file__).parent.parent.parent
static_path = base_dir / "webui" / "dist"