285 lines
11 KiB
Python
285 lines
11 KiB
Python
import asyncio
|
||
import sys
|
||
import json
|
||
import http
|
||
import websockets as Server
|
||
from src.logger import logger
|
||
from src.recv_handler.message_handler import message_handler
|
||
from src.recv_handler.meta_event_handler import meta_event_handler
|
||
from src.recv_handler.notice_handler import notice_handler
|
||
from src.recv_handler.message_sending import message_send_instance
|
||
from src.send_handler.nc_sending import nc_message_sender
|
||
from src.config import global_config
|
||
from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
|
||
from src.response_pool import put_response, check_timeout_response
|
||
|
||
message_queue = asyncio.Queue()
|
||
websocket_server = None # 保存WebSocket服务器实例以便关闭
|
||
|
||
|
||
async def message_recv(server_connection: Server.ServerConnection):
|
||
try:
|
||
await message_handler.set_server_connection(server_connection)
|
||
asyncio.create_task(notice_handler.set_server_connection(server_connection))
|
||
await nc_message_sender.set_server_connection(server_connection)
|
||
async for raw_message in server_connection:
|
||
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
|
||
decoded_raw_message: dict = json.loads(raw_message)
|
||
post_type = decoded_raw_message.get("post_type")
|
||
if post_type in ["meta_event", "message", "notice"]:
|
||
await message_queue.put(decoded_raw_message)
|
||
elif post_type is None:
|
||
await put_response(decoded_raw_message)
|
||
except asyncio.CancelledError:
|
||
logger.debug("message_recv 收到取消信号,正在关闭连接")
|
||
await server_connection.close()
|
||
raise
|
||
|
||
|
||
async def message_process():
|
||
while True:
|
||
message = await message_queue.get()
|
||
post_type = message.get("post_type")
|
||
if post_type == "message":
|
||
await message_handler.handle_raw_message(message)
|
||
elif post_type == "meta_event":
|
||
await meta_event_handler.handle_meta_event(message)
|
||
elif post_type == "notice":
|
||
await notice_handler.handle_notice(message)
|
||
else:
|
||
logger.warning(f"未知的post_type: {post_type}")
|
||
message_queue.task_done()
|
||
await asyncio.sleep(0.05)
|
||
|
||
|
||
async def main():
|
||
# 启动配置文件监控并注册napcat_server配置变更回调
|
||
from src.config import config_manager
|
||
|
||
# 保存napcat_server任务的引用,用于重启
|
||
napcat_task = None
|
||
restart_event = asyncio.Event()
|
||
|
||
async def on_napcat_config_change(old_value, new_value):
|
||
"""当napcat_server配置变更时,重启WebSocket服务器"""
|
||
nonlocal napcat_task
|
||
|
||
logger.warning(
|
||
f"NapCat配置已变更:\n"
|
||
f" 旧配置: {old_value.host}:{old_value.port}\n"
|
||
f" 新配置: {new_value.host}:{new_value.port}"
|
||
)
|
||
|
||
# 关闭当前WebSocket服务器
|
||
global websocket_server
|
||
if websocket_server:
|
||
try:
|
||
logger.info("正在关闭旧的WebSocket服务器...")
|
||
websocket_server.close()
|
||
await websocket_server.wait_closed()
|
||
logger.info("旧的WebSocket服务器已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭旧WebSocket服务器失败: {e}")
|
||
|
||
# 取消旧任务
|
||
if napcat_task and not napcat_task.done():
|
||
napcat_task.cancel()
|
||
try:
|
||
await napcat_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# 触发重启
|
||
restart_event.set()
|
||
|
||
config_manager.on_config_change("napcat_server", on_napcat_config_change)
|
||
|
||
# 启动文件监控
|
||
asyncio.create_task(config_manager.start_watch())
|
||
|
||
# WebSocket服务器重启循环
|
||
async def napcat_with_restart():
|
||
nonlocal napcat_task
|
||
while True:
|
||
restart_event.clear()
|
||
try:
|
||
await napcat_server()
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"NapCat服务器异常: {e}")
|
||
break
|
||
|
||
# 等待重启信号
|
||
if not restart_event.is_set():
|
||
break
|
||
|
||
logger.info("正在重启WebSocket服务器...")
|
||
await asyncio.sleep(1) # 等待1秒后重启
|
||
|
||
message_send_instance.maibot_router = router
|
||
_ = await asyncio.gather(napcat_with_restart(), mmc_start_com(), message_process(), check_timeout_response())
|
||
|
||
def check_napcat_server_token(conn, request):
|
||
token = global_config.napcat_server.token
|
||
if not token or token.strip() == "":
|
||
return None
|
||
auth_header = request.headers.get("Authorization")
|
||
if auth_header != f"Bearer {token}":
|
||
return Server.Response(
|
||
status=http.HTTPStatus.UNAUTHORIZED,
|
||
headers=Server.Headers([("Content-Type", "text/plain")]),
|
||
body=b"Unauthorized\n"
|
||
)
|
||
return None
|
||
|
||
async def napcat_server():
|
||
global websocket_server
|
||
logger.info("正在启动 MaiBot-Napcat-Adapter...")
|
||
logger.debug(f"日志等级: {global_config.debug.level}")
|
||
logger.debug("日志文件: logs/adapter_*.log")
|
||
try:
|
||
async with Server.serve(
|
||
message_recv,
|
||
global_config.napcat_server.host,
|
||
global_config.napcat_server.port,
|
||
max_size=2**26,
|
||
process_request=check_napcat_server_token
|
||
) as server:
|
||
websocket_server = server
|
||
logger.success(
|
||
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
|
||
)
|
||
try:
|
||
await server.serve_forever()
|
||
except asyncio.CancelledError:
|
||
logger.debug("napcat_server 收到取消信号")
|
||
raise
|
||
except OSError:
|
||
# 端口绑定失败时抛出异常让外层处理
|
||
raise
|
||
|
||
|
||
async def graceful_shutdown(silent: bool = False):
|
||
"""
|
||
优雅关闭adapter
|
||
Args:
|
||
silent: 静默模式,控制台不输出日志,但仍记录到文件
|
||
"""
|
||
global websocket_server
|
||
try:
|
||
if not silent:
|
||
logger.info("正在关闭adapter...")
|
||
else:
|
||
logger.debug("正在清理资源...")
|
||
|
||
# 先关闭WebSocket服务器
|
||
if websocket_server:
|
||
try:
|
||
logger.debug("正在关闭WebSocket服务器")
|
||
websocket_server.close()
|
||
await websocket_server.wait_closed()
|
||
logger.debug("WebSocket服务器已关闭")
|
||
except Exception as e:
|
||
logger.debug(f"关闭WebSocket服务器时出现错误: {e}")
|
||
|
||
# 关闭MMC连接
|
||
try:
|
||
await asyncio.wait_for(mmc_stop_com(), timeout=3)
|
||
except asyncio.TimeoutError:
|
||
logger.debug("关闭MMC连接超时")
|
||
except Exception as e:
|
||
logger.debug(f"关闭MMC连接时出现错误: {e}")
|
||
|
||
# 取消所有任务
|
||
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||
if tasks:
|
||
logger.debug(f"正在取消 {len(tasks)} 个任务")
|
||
for task in tasks:
|
||
if not task.done():
|
||
task.cancel()
|
||
|
||
# 等待任务完成,记录异常到日志文件
|
||
if tasks:
|
||
try:
|
||
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
|
||
# 记录任务取消的详细信息到日志文件
|
||
for i, result in enumerate(results):
|
||
if isinstance(result, Exception):
|
||
logger.debug(f"任务 {i+1} 清理时产生异常: {type(result).__name__}: {result}")
|
||
except asyncio.TimeoutError:
|
||
logger.debug("任务清理超时")
|
||
except Exception as e:
|
||
logger.debug(f"任务清理时出现错误: {e}")
|
||
|
||
if not silent:
|
||
logger.info("Adapter已成功关闭")
|
||
else:
|
||
logger.debug("资源清理完成")
|
||
except Exception as e:
|
||
logger.debug(f"graceful_shutdown异常: {e}", exc_info=True)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
loop.run_until_complete(main())
|
||
except KeyboardInterrupt:
|
||
logger.warning("收到中断信号,正在优雅关闭...")
|
||
try:
|
||
loop.run_until_complete(graceful_shutdown(silent=False))
|
||
except Exception:
|
||
pass
|
||
except OSError as e:
|
||
# 处理端口占用等网络错误
|
||
if e.errno == 10048 or "address already in use" in str(e).lower():
|
||
logger.error(f"❌ 端口 {global_config.napcat_server.port} 已被占用,请检查:")
|
||
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
|
||
logger.error(" 2. 修改 config.toml 中的 port 配置")
|
||
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
|
||
else:
|
||
logger.error(f"❌ 网络错误: {str(e)}")
|
||
|
||
logger.debug("完整错误信息:", exc_info=True)
|
||
|
||
# 端口占用时静默清理(控制台不输出,但记录到日志文件)
|
||
try:
|
||
loop.run_until_complete(graceful_shutdown(silent=True))
|
||
except Exception as e:
|
||
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
logger.error(f"❌ 主程序异常: {str(e)}")
|
||
logger.debug("详细错误信息:", exc_info=True)
|
||
try:
|
||
loop.run_until_complete(graceful_shutdown(silent=True))
|
||
except Exception as e:
|
||
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
|
||
sys.exit(1)
|
||
finally:
|
||
# 清理事件循环
|
||
try:
|
||
# 取消所有剩余任务
|
||
pending = asyncio.all_tasks(loop)
|
||
if pending:
|
||
logger.debug(f"finally块清理 {len(pending)} 个剩余任务")
|
||
for task in pending:
|
||
task.cancel()
|
||
# 给任务一点时间完成取消
|
||
try:
|
||
results = loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||
# 记录清理结果到日志文件
|
||
for i, result in enumerate(results):
|
||
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
|
||
logger.debug(f"剩余任务 {i+1} 清理异常: {type(result).__name__}: {result}")
|
||
except Exception as e:
|
||
logger.debug(f"清理剩余任务时出现错误: {e}")
|
||
except Exception as e:
|
||
logger.debug(f"finally块清理出现错误: {e}")
|
||
finally:
|
||
if loop and not loop.is_closed():
|
||
logger.debug("关闭事件循环")
|
||
loop.close()
|
||
sys.exit(0)
|