197 lines
7.9 KiB
Python
197 lines
7.9 KiB
Python
import asyncio
|
|
import sys
|
|
import json
|
|
import http
|
|
import websockets as Server
|
|
from src.logger import logger
|
|
from src.recv_handler.message_handler import message_handler
|
|
from src.recv_handler.meta_event_handler import meta_event_handler
|
|
from src.recv_handler.notice_handler import notice_handler
|
|
from src.recv_handler.message_sending import message_send_instance
|
|
from src.send_handler.nc_sending import nc_message_sender
|
|
from src.config import global_config
|
|
from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
|
|
from src.response_pool import put_response, check_timeout_response
|
|
|
|
message_queue = asyncio.Queue()
|
|
|
|
|
|
async def message_recv(server_connection: Server.ServerConnection):
|
|
await message_handler.set_server_connection(server_connection)
|
|
asyncio.create_task(notice_handler.set_server_connection(server_connection))
|
|
await nc_message_sender.set_server_connection(server_connection)
|
|
async for raw_message in server_connection:
|
|
logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message)
|
|
decoded_raw_message: dict = json.loads(raw_message)
|
|
post_type = decoded_raw_message.get("post_type")
|
|
if post_type in ["meta_event", "message", "notice"]:
|
|
await message_queue.put(decoded_raw_message)
|
|
elif post_type is None:
|
|
await put_response(decoded_raw_message)
|
|
|
|
|
|
async def message_process():
|
|
while True:
|
|
message = await message_queue.get()
|
|
post_type = message.get("post_type")
|
|
if post_type == "message":
|
|
await message_handler.handle_raw_message(message)
|
|
elif post_type == "meta_event":
|
|
await meta_event_handler.handle_meta_event(message)
|
|
elif post_type == "notice":
|
|
await notice_handler.handle_notice(message)
|
|
else:
|
|
logger.warning(f"未知的post_type: {post_type}")
|
|
message_queue.task_done()
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
async def main():
|
|
message_send_instance.maibot_router = router
|
|
_ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process(), check_timeout_response())
|
|
|
|
def check_napcat_server_token(conn, request):
|
|
token = global_config.napcat_server.token
|
|
if not token or token.strip() == "":
|
|
return None
|
|
auth_header = request.headers.get("Authorization")
|
|
if auth_header != f"Bearer {token}":
|
|
return Server.Response(
|
|
status=http.HTTPStatus.UNAUTHORIZED,
|
|
headers=Server.Headers([("Content-Type", "text/plain")]),
|
|
body=b"Unauthorized\n"
|
|
)
|
|
return None
|
|
|
|
async def napcat_server():
|
|
logger.info("正在启动 MaiBot-Napcat-Adapter...")
|
|
logger.debug(f"日志等级: {global_config.debug.level}")
|
|
logger.debug("日志文件: logs/adapter_*.log")
|
|
try:
|
|
async with Server.serve(
|
|
message_recv,
|
|
global_config.napcat_server.host,
|
|
global_config.napcat_server.port,
|
|
max_size=2**26,
|
|
process_request=check_napcat_server_token
|
|
) as server:
|
|
logger.success(
|
|
f"✅ Adapter 启动成功! 监听: ws://{global_config.napcat_server.host}:{global_config.napcat_server.port}"
|
|
)
|
|
await server.serve_forever()
|
|
except OSError:
|
|
# 端口绑定失败时抛出异常让外层处理
|
|
raise
|
|
|
|
|
|
async def graceful_shutdown(silent: bool = False):
|
|
"""
|
|
优雅关闭adapter
|
|
Args:
|
|
silent: 静默模式,控制台不输出日志,但仍记录到文件
|
|
"""
|
|
try:
|
|
if not silent:
|
|
logger.info("正在关闭adapter...")
|
|
else:
|
|
logger.debug("正在清理资源...")
|
|
|
|
# 先关闭MMC连接
|
|
try:
|
|
await asyncio.wait_for(mmc_stop_com(), timeout=3)
|
|
except asyncio.TimeoutError:
|
|
logger.debug("关闭MMC连接超时")
|
|
except Exception as e:
|
|
logger.debug(f"关闭MMC连接时出现错误: {e}")
|
|
|
|
# 取消所有任务
|
|
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
|
if tasks:
|
|
logger.debug(f"正在取消 {len(tasks)} 个任务")
|
|
for task in tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# 等待任务完成,记录异常到日志文件
|
|
if tasks:
|
|
try:
|
|
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
|
|
# 记录任务取消的详细信息到日志文件
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
logger.debug(f"任务 {i+1} 清理时产生异常: {type(result).__name__}: {result}")
|
|
except asyncio.TimeoutError:
|
|
logger.debug("任务清理超时")
|
|
except Exception as e:
|
|
logger.debug(f"任务清理时出现错误: {e}")
|
|
|
|
if not silent:
|
|
logger.info("Adapter已成功关闭")
|
|
else:
|
|
logger.debug("资源清理完成")
|
|
except Exception as e:
|
|
logger.debug(f"graceful_shutdown异常: {e}", exc_info=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(main())
|
|
except KeyboardInterrupt:
|
|
logger.warning("收到中断信号,正在优雅关闭...")
|
|
try:
|
|
loop.run_until_complete(graceful_shutdown(silent=False))
|
|
except Exception:
|
|
pass
|
|
except OSError as e:
|
|
# 处理端口占用等网络错误
|
|
if e.errno == 10048 or "address already in use" in str(e).lower():
|
|
logger.error(f"❌ 端口 {global_config.napcat_server.port} 已被占用,请检查:")
|
|
logger.error(" 1. 是否有其他 MaiBot-Napcat-Adapter 实例正在运行")
|
|
logger.error(" 2. 修改 config.toml 中的 port 配置")
|
|
logger.error(f" 3. 使用命令查看占用进程: netstat -ano | findstr {global_config.napcat_server.port}")
|
|
logger.debug("完整错误信息:", exc_info=True)
|
|
else:
|
|
logger.error(f"❌ 网络错误: {str(e)}")
|
|
logger.debug("完整错误信息:", exc_info=True)
|
|
# 端口占用时静默清理(控制台不输出,但记录到日志文件)
|
|
try:
|
|
loop.run_until_complete(graceful_shutdown(silent=True))
|
|
except Exception as e:
|
|
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"❌ 主程序异常: {str(e)}")
|
|
logger.debug("详细错误信息:", exc_info=True)
|
|
try:
|
|
loop.run_until_complete(graceful_shutdown(silent=True))
|
|
except Exception as e:
|
|
logger.debug(f"清理资源时出现错误: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
finally:
|
|
# 清理事件循环
|
|
try:
|
|
# 取消所有剩余任务
|
|
pending = asyncio.all_tasks(loop)
|
|
if pending:
|
|
logger.debug(f"finally块清理 {len(pending)} 个剩余任务")
|
|
for task in pending:
|
|
task.cancel()
|
|
# 给任务一点时间完成取消
|
|
try:
|
|
results = loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
# 记录清理结果到日志文件
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
|
|
logger.debug(f"剩余任务 {i+1} 清理异常: {type(result).__name__}: {result}")
|
|
except Exception as e:
|
|
logger.debug(f"清理剩余任务时出现错误: {e}")
|
|
except Exception as e:
|
|
logger.debug(f"finally块清理出现错误: {e}")
|
|
finally:
|
|
if loop and not loop.is_closed():
|
|
logger.debug("关闭事件循环")
|
|
loop.close()
|
|
sys.exit(0)
|