MaiBot-Napcat-Adapter/main.py

285 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import sys
import json
import http
import websockets as Server
from src.logger import logger
from src.recv_handler.message_handler import message_handler
from src.recv_handler.meta_event_handler import meta_event_handler
from src.recv_handler.notice_handler import notice_handler
from src.recv_handler.message_sending import message_send_instance
from src.send_handler.nc_sending import nc_message_sender
from src.config import global_config
from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
from src.response_pool import put_response, check_timeout_response
message_queue = asyncio.Queue()
websocket_server = None # 保存WebSocket服务器实例以便关闭
async def message_recv(server_connection: Server.ServerConnection):
try:
await message_handler.set_server_connection(server_connection)
asyncio.create_task(notice_handler.set_server_connection(server_connection))
await nc_message_sender.set_server_connection(server_connection)
async for raw_message in server_connection:
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
decoded_raw_message: dict = json.loads(raw_message)
post_type = decoded_raw_message.get("post_type")
if post_type in ["meta_event", "message", "notice"]:
await message_queue.put(decoded_raw_message)
elif post_type is None:
await put_response(decoded_raw_message)
except asyncio.CancelledError:
logger.debug("message_recv 收到取消信号,正在关闭连接")
await server_connection.close()
raise
async def message_process():
while True:
message = await message_queue.get()
post_type = message.get("post_type")
if post_type == "message":
await message_handler.handle_raw_message(message)
elif post_type == "meta_event":
await meta_event_handler.handle_meta_event(message)
elif post_type == "notice":
await notice_handler.handle_notice(message)
else:
logger.warning(f"未知的post_type: {post_type}")
message_queue.task_done()
await asyncio.sleep(0.05)
async def main():
# 启动配置文件监控并注册napcat_server配置变更回调
from src.config import config_manager
# 保存napcat_server任务的引用用于重启
napcat_task = None
restart_event = asyncio.Event()
async def on_napcat_config_change(old_value, new_value):
"""当napcat_server配置变更时重启WebSocket服务器"""
nonlocal napcat_task
logger.warning(
f"NapCat配置已变更:\n"
f" 旧配置: {old_value.host}:{old_value.port}\n"
f" 新配置: {new_value.host}:{new_value.port}"
)
# 关闭当前WebSocket服务器
global websocket_server
if websocket_server:
try:
logger.info("正在关闭旧的WebSocket服务器...")
websocket_server.close()
await websocket_server.wait_closed()
logger.info("旧的WebSocket服务器已关闭")
except Exception as e:
logger.error(f"关闭旧WebSocket服务器失败: {e}")
# 取消旧任务
if napcat_task and not napcat_task.done():
napcat_task.cancel()
try:
await napcat_task
except asyncio.CancelledError:
pass
# 触发重启
restart_event.set()
config_manager.on_config_change("napcat_server", on_napcat_config_change)
# 启动文件监控
asyncio.create_task(config_manager.start_watch())
# WebSocket服务器重启循环
async def napcat_with_restart():
nonlocal napcat_task
while True:
restart_event.clear()
try:
await napcat_server()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"NapCat服务器异常: {e}")
break
# 等待重启信号
if not restart_event.is_set():
break
logger.info("正在重启WebSocket服务器...")
await asyncio.sleep(1) # 等待1秒后重启
message_send_instance.maibot_router = router
_ = await asyncio.gather(napcat_with_restart(), mmc_start_com(), message_process(), check_timeout_response())
def check_napcat_server_token(conn, request):
token = global_config.napcat_server.token
if not token or token.strip() == "":
return None
auth_header = request.headers.get("Authorization")
if auth_header != f"Bearer {token}":
return Server.Response(
status=http.HTTPStatus.UNAUTHORIZED,
headers=Server.Headers([("Content-Type", "text/plain")]),
body=b"Unauthorized\n"
)
return None
async def napcat_server():
global websocket_server
logger.info("正在启动 MaiBot-Napcat-Adapter...")
logger.debug(f"日志等级: {global_config.debug.level}")
logger.debug("日志文件: logs/adapter_*.log")
try:
async with Server.serve(
message_recv,
global_config.napcat_server.host,
global_config.napcat_server.port,
max_size=2**26,
process_request=check_napcat_server_token
) as server:
websocket_server = server
logger.success(
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
)
try:
await server.serve_forever()
except asyncio.CancelledError:
logger.debug("napcat_server 收到取消信号")
raise
except OSError:
# 端口绑定失败时抛出异常让外层处理
raise
async def graceful_shutdown(silent: bool = False):
"""
优雅关闭adapter
Args:
silent: 静默模式,控制台不输出日志,但仍记录到文件
"""
global websocket_server
try:
if not silent:
logger.info("正在关闭adapter...")
else:
logger.debug("正在清理资源...")
# 先关闭WebSocket服务器
if websocket_server:
try:
logger.debug("正在关闭WebSocket服务器")
websocket_server.close()
await websocket_server.wait_closed()
logger.debug("WebSocket服务器已关闭")
except Exception as e:
logger.debug(f"关闭WebSocket服务器时出现错误: {e}")
# 关闭MMC连接
try:
await asyncio.wait_for(mmc_stop_com(), timeout=3)
except asyncio.TimeoutError:
logger.debug("关闭MMC连接超时")
except Exception as e:
logger.debug(f"关闭MMC连接时出现错误: {e}")
# 取消所有任务
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if tasks:
logger.debug(f"正在取消 {len(tasks)} 个任务")
for task in tasks:
if not task.done():
task.cancel()
# 等待任务完成,记录异常到日志文件
if tasks:
try:
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
# 记录任务取消的详细信息到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.debug(f"任务 {i+1} 清理时产生异常: {type(result).__name__}: {result}")
except asyncio.TimeoutError:
logger.debug("任务清理超时")
except Exception as e:
logger.debug(f"任务清理时出现错误: {e}")
if not silent:
logger.info("Adapter已成功关闭")
else:
logger.debug("资源清理完成")
except Exception as e:
logger.debug(f"graceful_shutdown异常: {e}", exc_info=True)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...")
try:
loop.run_until_complete(graceful_shutdown(silent=False))
except Exception:
pass
except OSError as e:
# 处理端口占用等网络错误
if e.errno == 10048 or "address already in use" in str(e).lower():
logger.error(f"❌ 端口 {global_config.napcat_server.port} 已被占用,请检查:")
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
logger.error(" 2. 修改 config.toml 中的 port 配置")
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
else:
logger.error(f"❌ 网络错误: {str(e)}")
logger.debug("完整错误信息:", exc_info=True)
# 端口占用时静默清理(控制台不输出,但记录到日志文件)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e:
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1)
except Exception as e:
logger.error(f"❌ 主程序异常: {str(e)}")
logger.debug("详细错误信息:", exc_info=True)
try:
loop.run_until_complete(graceful_shutdown(silent=True))
except Exception as e:
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
sys.exit(1)
finally:
# 清理事件循环
try:
# 取消所有剩余任务
pending = asyncio.all_tasks(loop)
if pending:
logger.debug(f"finally块清理 {len(pending)} 个剩余任务")
for task in pending:
task.cancel()
# 给任务一点时间完成取消
try:
results = loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
# 记录清理结果到日志文件
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.debug(f"剩余任务 {i+1} 清理异常: {type(result).__name__}: {result}")
except Exception as e:
logger.debug(f"清理剩余任务时出现错误: {e}")
except Exception as e:
logger.debug(f"finally块清理出现错误: {e}")
finally:
if loop and not loop.is_closed():
logger.debug("关闭事件循环")
loop.close()
sys.exit(0)