From 43e0c9a4ada95261f9fc75be48699e09d60c43af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 18 Nov 2025 00:38:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20WebSocket=20?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=8E=A8=E9=80=81=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=AE=9E=E6=97=B6=E6=97=A5=E5=BF=97=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E5=92=8C=E5=8E=86=E5=8F=B2=E6=97=A5=E5=BF=97=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 4 + src/common/logger.py | 102 +++++++++++++++++++++++++- src/webui/log_broadcaster.py | 0 src/webui/logs_routes.py | 0 src/webui/logs_ws.py | 138 +++++++++++++++++++++++++++++++++++ src/webui/manager.py | 14 ++++ 6 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 src/webui/log_broadcaster.py create mode 100644 src/webui/logs_routes.py create mode 100644 src/webui/logs_ws.py diff --git a/bot.py b/bot.py index ba210462..38894b29 100644 --- a/bot.py +++ b/bot.py @@ -212,6 +212,10 @@ if __name__ == "__main__": # 创建事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + + # 初始化 WebSocket 日志推送 + from src.common.logger import initialize_ws_handler + initialize_ws_handler(loop) try: # 执行初始化和任务调度 diff --git a/src/common/logger.py b/src/common/logger.py index 55833c34..23ccc490 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -19,6 +19,7 @@ PROJECT_ROOT = logger_file.parent.parent.parent.resolve() # 全局handler实例,避免重复创建 _file_handler = None _console_handler = None +_ws_handler = None def get_file_handler(): @@ -59,6 +60,35 @@ def get_console_handler(): return _console_handler +def get_ws_handler(): + """获取 WebSocket handler 单例""" + global _ws_handler + if _ws_handler is None: + _ws_handler = WebSocketLogHandler() + # WebSocket handler 只推送 INFO 及以上级别 + _ws_handler.setLevel(logging.INFO) + return _ws_handler + + +def initialize_ws_handler(loop): + """初始化 WebSocket handler 的事件循环 + + Args: + loop: asyncio 事件循环 + """ + handler = get_ws_handler() + handler.set_loop(loop) + + # 为 WebSocket handler 设置 JSON 格式化器(与文件格式相同) + handler.setFormatter(file_formatter) + + # 添加到根日志记录器 + root_logger = logging.getLogger() + if handler not in root_logger.handlers: + root_logger.addHandler(handler) + print("[日志系统] ✅ WebSocket 日志推送已启用") + + class TimestampedFileHandler(logging.Handler): """基于时间戳的文件处理器,简单的轮转份数限制""" @@ -145,12 +175,78 @@ class TimestampedFileHandler(logging.Handler): super().close() +class WebSocketLogHandler(logging.Handler): + """WebSocket 日志处理器 - 将日志实时推送到前端""" + + _log_counter = 0 # 类级别计数器,确保 ID 唯一性 + + def __init__(self, loop=None): + super().__init__() + self.loop = loop + self._initialized = False + + def set_loop(self, loop): + """设置事件循环""" + self.loop = loop + self._initialized = True + + def emit(self, record): + """发送日志到 WebSocket 客户端""" + if not self._initialized or self.loop is None: + return + + try: + # 获取格式化后的消息 + # 对于 structlog,formatted message 包含完整的日志信息 + formatted_msg = self.format(record) if self.formatter else record.getMessage() + + # 如果是 JSON 格式(文件格式化器),解析它 + message = formatted_msg + try: + import json + log_dict = json.loads(formatted_msg) + message = log_dict.get('event', formatted_msg) + except (json.JSONDecodeError, ValueError): + # 不是 JSON,直接使用消息 + message = formatted_msg + + # 生成唯一 ID: 时间戳毫秒 + 自增计数器 + WebSocketLogHandler._log_counter += 1 + log_id = f"{int(record.created * 1000)}_{WebSocketLogHandler._log_counter}" + + # 格式化日志数据 + log_data = { + "id": log_id, + "timestamp": datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S"), + "level": record.levelname, + "module": record.name, + "message": message, + } + + # 异步广播日志(不阻塞日志记录) + try: + import asyncio + from src.webui.logs_ws import broadcast_log + + asyncio.run_coroutine_threadsafe( + broadcast_log(log_data), + self.loop + ) + except Exception: + # WebSocket 推送失败不影响日志记录 + pass + + except Exception: + # 不要让 WebSocket 错误影响日志系统 + self.handleError(record) + + # 旧的轮转文件处理器已移除,现在使用基于时间戳的处理器 def close_handlers(): """安全关闭所有handler""" - global _file_handler, _console_handler + global _file_handler, _console_handler, _ws_handler if _file_handler: _file_handler.close() @@ -159,6 +255,10 @@ def close_handlers(): if _console_handler: _console_handler.close() _console_handler = None + + if _ws_handler: + _ws_handler.close() + _ws_handler = None def remove_duplicate_handlers(): # sourcery skip: for-append-to-extend, list-comprehension diff --git a/src/webui/log_broadcaster.py b/src/webui/log_broadcaster.py new file mode 100644 index 00000000..e69de29b diff --git a/src/webui/logs_routes.py b/src/webui/logs_routes.py new file mode 100644 index 00000000..e69de29b diff --git a/src/webui/logs_ws.py b/src/webui/logs_ws.py new file mode 100644 index 00000000..d8ef65aa --- /dev/null +++ b/src/webui/logs_ws.py @@ -0,0 +1,138 @@ +"""WebSocket 日志推送模块""" +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from typing import Set +import json +from pathlib import Path +from src.common.logger import get_logger + +logger = get_logger("webui.logs_ws") +router = APIRouter() + +# 全局 WebSocket 连接池 +active_connections: Set[WebSocket] = set() + + +def load_recent_logs(limit: int = 100) -> list[dict]: + """从日志文件中加载最近的日志 + + Args: + limit: 返回的最大日志条数 + + Returns: + 日志列表 + """ + logs = [] + log_dir = Path("logs") + + if not log_dir.exists(): + return logs + + # 获取所有日志文件,按修改时间排序 + log_files = sorted(log_dir.glob("app_*.log.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True) + + # 用于生成唯一 ID 的计数器 + log_counter = 0 + + # 从最新的文件开始读取 + for log_file in log_files: + if len(logs) >= limit: + break + + try: + with open(log_file, "r", encoding="utf-8") as f: + lines = f.readlines() + # 从文件末尾开始读取 + for line in reversed(lines): + if len(logs) >= limit: + break + try: + log_entry = json.loads(line.strip()) + # 转换为前端期望的格式 + # 使用时间戳 + 计数器生成唯一 ID + timestamp_id = log_entry.get("timestamp", "0").replace("-", "").replace(" ", "").replace(":", "") + formatted_log = { + "id": f"{timestamp_id}_{log_counter}", + "timestamp": log_entry.get("timestamp", ""), + "level": log_entry.get("level", "INFO").upper(), + "module": log_entry.get("logger_name", ""), + "message": log_entry.get("event", ""), + } + logs.append(formatted_log) + log_counter += 1 + except (json.JSONDecodeError, KeyError): + continue + except Exception as e: + logger.error(f"读取日志文件失败 {log_file}: {e}") + continue + + # 反转列表,使其按时间顺序排列(旧到新) + return list(reversed(logs)) + + +@router.websocket("/ws/logs") +async def websocket_logs(websocket: WebSocket): + """WebSocket 日志推送端点 + + 客户端连接后会持续接收服务器端的日志消息 + """ + await websocket.accept() + active_connections.add(websocket) + logger.info(f"📡 WebSocket 客户端已连接,当前连接数: {len(active_connections)}") + + # 连接建立后,立即发送历史日志 + try: + recent_logs = load_recent_logs(limit=100) + logger.info(f"发送 {len(recent_logs)} 条历史日志到客户端") + + for log_entry in recent_logs: + await websocket.send_text(json.dumps(log_entry, ensure_ascii=False)) + except Exception as e: + logger.error(f"发送历史日志失败: {e}") + + try: + # 保持连接,等待客户端消息或断开 + while True: + # 接收客户端消息(用于心跳或控制指令) + data = await websocket.receive_text() + + # 可以处理客户端的控制消息,例如: + # - "ping" -> 心跳检测 + # - {"filter": "ERROR"} -> 设置日志级别过滤 + if data == "ping": + await websocket.send_text("pong") + + except WebSocketDisconnect: + active_connections.discard(websocket) + logger.info(f"📡 WebSocket 客户端已断开,当前连接数: {len(active_connections)}") + except Exception as e: + logger.error(f"❌ WebSocket 错误: {e}") + active_connections.discard(websocket) + + +async def broadcast_log(log_data: dict): + """广播日志到所有连接的 WebSocket 客户端 + + Args: + log_data: 日志数据字典 + """ + if not active_connections: + return + + # 格式化为 JSON + message = json.dumps(log_data, ensure_ascii=False) + + # 记录需要断开的连接 + disconnected = set() + + # 广播到所有客户端 + for connection in active_connections: + try: + await connection.send_text(message) + except Exception: + # 发送失败,标记为断开 + disconnected.add(connection) + + # 清理断开的连接 + if disconnected: + active_connections.difference_update(disconnected) + logger.debug(f"清理了 {len(disconnected)} 个断开的 WebSocket 连接") diff --git a/src/webui/manager.py b/src/webui/manager.py index 9a4999ff..3919df17 100644 --- a/src/webui/manager.py +++ b/src/webui/manager.py @@ -31,6 +31,14 @@ def setup_webui(mode: str = "production") -> bool: def setup_dev_mode() -> bool: """设置开发模式 - 仅启用 CORS,前端自行启动""" + from src.common.server import get_global_server + from .logs_ws import router as logs_router + + # 注册 WebSocket 日志路由(开发模式也需要) + server = get_global_server() + server.register_router(logs_router) + logger.info("✅ WebSocket 日志推送路由已注册") + logger.info("📝 WebUI 开发模式已启用") logger.info("🌐 请手动启动前端开发服务器: cd webui && npm run dev") logger.info("💡 前端将运行在 http://localhost:7999") @@ -42,6 +50,7 @@ def setup_production_mode() -> bool: try: from src.common.server import get_global_server from starlette.responses import FileResponse + from .logs_ws import router as logs_router import mimetypes # 确保正确的 MIME 类型映射 @@ -52,6 +61,11 @@ def setup_production_mode() -> bool: mimetypes.add_type('application/json', '.json') server = get_global_server() + + # 注册 WebSocket 日志路由 + server.register_router(logs_router) + logger.info("✅ WebSocket 日志推送路由已注册") + base_dir = Path(__file__).parent.parent.parent static_path = base_dir / "webui" / "dist"