From 619b5d82160a5d81ed904233a3da84d77cee52a1 Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Tue, 15 Apr 2025 01:05:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E8=A7=A3=E5=86=B3#6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 6 ++--- src/message_queue.py | 41 ++++++++++++++++++++++++++--- src/recv_handler.py | 22 +++++++++++++--- src/send_handler.py | 15 ++++++++--- src/utils.py | 62 +++++++++++++++++++++++++++++++++++++------- 5 files changed, 123 insertions(+), 23 deletions(-) diff --git a/main.py b/main.py index 252f8d6..30ae51c 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from src.recv_handler import recv_handler from src.send_handler import send_handler from src.config import global_config from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router -from src.message_queue import recv_queue, message_queue +from src.message_queue import message_queue, put_response, check_timeout_response async def message_recv(server_connection: Server.ServerConnection): @@ -24,7 +24,7 @@ async def message_recv(server_connection: Server.ServerConnection): elif post_type == "notice": await message_queue.put(decoded_raw_message) elif post_type is None: - await recv_queue.put(decoded_raw_message) + await put_response(decoded_raw_message) async def message_process(): @@ -45,7 +45,7 @@ async def message_process(): async def main(): recv_handler.maibot_router = router - _ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process()) + _ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process(), check_timeout_response()) async def napcat_server(): diff --git a/src/message_queue.py b/src/message_queue.py index 0b4aa3e..95d646c 100644 --- a/src/message_queue.py +++ b/src/message_queue.py @@ -1,10 +1,43 @@ import asyncio +import time +from typing import Dict +from .config import global_config +from .logger import logger -recv_queue = asyncio.Queue() +response_dict: Dict = {} +response_time_dict: Dict = {} message_queue = asyncio.Queue() -async def get_response(): - response = await recv_queue.get() - recv_queue.task_done() +async def get_response(request_id: str) -> dict: + retry_count = 0 + max_retries = 50 # 10秒超时 + while request_id not in response_dict: + retry_count += 1 + if retry_count >= max_retries: + raise TimeoutError(f"请求超时,未收到响应,request_id: {request_id}") + await asyncio.sleep(0.2) + response = response_dict.pop(request_id) + _ = response_time_dict.pop(request_id) return response + + +async def put_response(response: dict): + echo_id = response.get("echo") + now_time = time.time() + response_dict[echo_id] = response + response_time_dict[echo_id] = now_time + + +async def check_timeout_response() -> None: + while True: + cleaned_message_count: int = 0 + now_time = time.time() + for echo_id, response_time in list(response_time_dict.items()): + if now_time - response_time > global_config.napcat_heartbeat_interval: + cleaned_message_count += 1 + response_dict.pop(echo_id) + response_time_dict.pop(echo_id) + logger.warning(f"响应消息 {echo_id} 超时,已删除") + logger.info(f"已删除 {cleaned_message_count} 条超时响应消息") + await asyncio.sleep(global_config.napcat_heartbeat_interval) diff --git a/src/recv_handler.py b/src/recv_handler.py index d216f34..ea58486 100644 --- a/src/recv_handler.py +++ b/src/recv_handler.py @@ -3,8 +3,9 @@ from .config import global_config import time import asyncio import json -import websockets.asyncio.server as Server +import websockets as Server from typing import List, Tuple +import uuid from . import MetaEventType, RealMessageType, MessageType, NoticeType from maim_message import ( @@ -267,15 +268,24 @@ class RecvHandler: logger.warning("reply处理失败") case RealMessageType.forward: forward_message_id = sub_message.get("data").get("id") + request_uuid = str(uuid.uuid4()) payload = json.dumps( { "action": "get_forward_msg", "params": {"message_id": forward_message_id}, + "echo": request_uuid, } ) await self.server_connection.send(payload) # response = await self.server_connection.recv() - response: dict = await get_response() + try: + response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error("获取转发消息超时") + return None + except Exception as e: + logger.error(f"获取转发消息失败: {str(e)}") + return None logger.debug( f"转发消息原始格式:{json.dumps(response)[:80]}..." if len(json.dumps(response)) > 80 @@ -290,6 +300,9 @@ class RecvHandler: case RealMessageType.node: logger.warning("不支持转发消息节点解析") pass + case _: + logger.warning(f"未知消息类型:{sub_message_type}") + pass return seg_message async def handle_text_message(self, raw_message: dict) -> Seg: @@ -366,13 +379,16 @@ class RecvHandler: else: return None - async def handle_reply_message(self, raw_message: dict) -> None: + async def handle_reply_message(self, raw_message: dict) -> Seg: """ 处理回复消息 """ message_id = raw_message.get("data").get("id") message_detail: dict = await get_message_detail(self.server_connection, message_id) + if not message_detail: + logger.warning("获取被引用的消息详情失败") + return None sender_info: dict = message_detail.get("sender") sender_nickname: str = sender_info.get("nickname") if not sender_nickname: diff --git a/src/send_handler.py b/src/send_handler.py index ad7c3bd..f030996 100644 --- a/src/send_handler.py +++ b/src/send_handler.py @@ -1,5 +1,6 @@ import json -import websockets.asyncio.server as Server +import websockets as Server +import uuid # from .config import global_config # 白名单机制不启用 @@ -146,9 +147,17 @@ class SendHandler: } async def send_message_to_napcat(self, action: str, params: dict) -> dict: - payload = json.dumps({"action": action, "params": params}) + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": action, "params": params, "echo": request_uuid}) await self.server_connection.send(payload) - response = await get_response() + try: + response = await get_response(request_uuid) + except TimeoutError: + logger.error("发送消息超时,未收到响应") + return {"status": "error", "message": "timeout"} + except Exception as e: + logger.error(f"发送消息失败: {e}") + return {"status": "error", "message": str(e)} return response diff --git a/src/utils.py b/src/utils.py index b51bf3b..e3e47f5 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,6 +1,7 @@ import websockets.asyncio.server as Server import json import base64 +import uuid from .logger import logger from .message_queue import get_response @@ -33,9 +34,17 @@ async def get_group_info(websocket: Server.ServerConnection, group_id: int) -> d 返回值需要处理可能为空的情况 """ - payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}}) + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}, "echo": request_uuid}) await websocket.send(payload) - socket_response: dict = await get_response() + try: + socket_response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error(f"获取群信息超时,群号: {group_id}") + return None + except Exception as e: + logger.error(f"获取群信息失败: {e}") + return None logger.debug(socket_response) return socket_response.get("data") @@ -46,14 +55,23 @@ async def get_member_info(websocket: Server.ServerConnection, group_id: int, use 返回值需要处理可能为空的情况 """ + request_uuid = str(uuid.uuid4()) payload = json.dumps( { "action": "get_group_member_info", "params": {"group_id": group_id, "user_id": user_id, "no_cache": True}, + "echo": request_uuid, } ) await websocket.send(payload) - socket_response: dict = await get_response() + try: + socket_response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error(f"获取成员信息超时,群号: {group_id}, 用户ID: {user_id}") + return None + except Exception as e: + logger.error(f"获取成员信息失败: {e}") + return None logger.debug(socket_response) return socket_response.get("data") @@ -93,9 +111,17 @@ async def get_self_info(websocket: Server.ServerConnection) -> dict: Returns: data: dict: 返回的自身信息 """ - payload = json.dumps({"action": "get_login_info", "params": {}}) + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": "get_login_info", "params": {}, "echo": request_uuid}) await websocket.send(payload) - response: dict = await get_response() + try: + response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error("获取自身信息超时") + return None + except Exception as e: + logger.error(f"获取自身信息失败: {e}") + return None logger.debug(response) return response.get("data") @@ -121,24 +147,40 @@ async def get_stranger_info(websocket: Server.ServerConnection, user_id: int) -> Returns: dict: 返回的陌生人信息 """ - payload = json.dumps({"action": "get_stranger_info", "params": {"user_id": user_id}}) + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": "get_stranger_info", "params": {"user_id": user_id}, "echo": request_uuid}) await websocket.send(payload) - response: dict = await get_response() + try: + response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error(f"获取陌生人信息超时,用户ID: {user_id}") + return None + except Exception as e: + logger.error(f"获取陌生人信息失败: {e}") + return None logger.debug(response) return response.get("data") async def get_message_detail(websocket: Server.ServerConnection, message_id: str) -> dict: """ - 获取消息详情 + 获取消息详情,可能为空 Parameters: websocket: WebSocket连接对象 message_id: 消息ID Returns: dict: 返回的消息详情 """ - payload = json.dumps({"action": "get_msg", "params": {"message_id": message_id}}) + request_uuid = str(uuid.uuid4()) + payload = json.dumps({"action": "get_msg", "params": {"message_id": message_id}, "echo": request_uuid}) await websocket.send(payload) - response: dict = await get_response() + try: + response: dict = await get_response(request_uuid) + except TimeoutError: + logger.error(f"获取消息详情超时,消息ID: {message_id}") + return None + except Exception as e: + logger.error(f"获取消息详情失败: {e}") + return None logger.debug(response) return response.get("data")