Bakadax 2025-04-15 08:58:06 +08:00
commit d499a05e2c
5 changed files with 123 additions and 23 deletions

View File

@ -7,7 +7,7 @@ from src.recv_handler import recv_handler
from src.send_handler import send_handler from src.send_handler import send_handler
from src.config import global_config from src.config import global_config
from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router from src.mmc_com_layer import mmc_start_com, mmc_stop_com, router
from src.message_queue import recv_queue, message_queue from src.message_queue import message_queue, put_response, check_timeout_response
async def message_recv(server_connection: Server.ServerConnection): async def message_recv(server_connection: Server.ServerConnection):
@ -24,7 +24,7 @@ async def message_recv(server_connection: Server.ServerConnection):
elif post_type == "notice": elif post_type == "notice":
await message_queue.put(decoded_raw_message) await message_queue.put(decoded_raw_message)
elif post_type is None: elif post_type is None:
await recv_queue.put(decoded_raw_message) await put_response(decoded_raw_message)
async def message_process(): async def message_process():
@ -45,7 +45,7 @@ async def message_process():
async def main(): async def main():
recv_handler.maibot_router = router recv_handler.maibot_router = router
_ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process()) _ = await asyncio.gather(napcat_server(), mmc_start_com(), message_process(), check_timeout_response())
async def napcat_server(): async def napcat_server():

View File

@ -1,10 +1,43 @@
import asyncio import asyncio
import time
from typing import Dict
from .config import global_config
from .logger import logger
recv_queue = asyncio.Queue() response_dict: Dict = {}
response_time_dict: Dict = {}
message_queue = asyncio.Queue() message_queue = asyncio.Queue()
async def get_response(): async def get_response(request_id: str) -> dict:
response = await recv_queue.get() retry_count = 0
recv_queue.task_done() max_retries = 50 # 10秒超时
while request_id not in response_dict:
retry_count += 1
if retry_count >= max_retries:
raise TimeoutError(f"请求超时未收到响应request_id: {request_id}")
await asyncio.sleep(0.2)
response = response_dict.pop(request_id)
_ = response_time_dict.pop(request_id)
return response return response
async def put_response(response: dict):
echo_id = response.get("echo")
now_time = time.time()
response_dict[echo_id] = response
response_time_dict[echo_id] = now_time
async def check_timeout_response() -> None:
while True:
cleaned_message_count: int = 0
now_time = time.time()
for echo_id, response_time in list(response_time_dict.items()):
if now_time - response_time > global_config.napcat_heartbeat_interval:
cleaned_message_count += 1
response_dict.pop(echo_id)
response_time_dict.pop(echo_id)
logger.warning(f"响应消息 {echo_id} 超时,已删除")
logger.info(f"已删除 {cleaned_message_count} 条超时响应消息")
await asyncio.sleep(global_config.napcat_heartbeat_interval)

View File

@ -3,8 +3,9 @@ from .config import global_config
import time import time
import asyncio import asyncio
import json import json
import websockets.asyncio.server as Server import websockets as Server
from typing import List, Tuple from typing import List, Tuple
import uuid
from . import MetaEventType, RealMessageType, MessageType, NoticeType from . import MetaEventType, RealMessageType, MessageType, NoticeType
from maim_message import ( from maim_message import (
@ -267,15 +268,24 @@ class RecvHandler:
logger.warning("reply处理失败") logger.warning("reply处理失败")
case RealMessageType.forward: case RealMessageType.forward:
forward_message_id = sub_message.get("data").get("id") forward_message_id = sub_message.get("data").get("id")
request_uuid = str(uuid.uuid4())
payload = json.dumps( payload = json.dumps(
{ {
"action": "get_forward_msg", "action": "get_forward_msg",
"params": {"message_id": forward_message_id}, "params": {"message_id": forward_message_id},
"echo": request_uuid,
} }
) )
await self.server_connection.send(payload) await self.server_connection.send(payload)
# response = await self.server_connection.recv() # response = await self.server_connection.recv()
response: dict = await get_response() try:
response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error("获取转发消息超时")
return None
except Exception as e:
logger.error(f"获取转发消息失败: {str(e)}")
return None
logger.debug( logger.debug(
f"转发消息原始格式:{json.dumps(response)[:80]}..." f"转发消息原始格式:{json.dumps(response)[:80]}..."
if len(json.dumps(response)) > 80 if len(json.dumps(response)) > 80
@ -290,6 +300,9 @@ class RecvHandler:
case RealMessageType.node: case RealMessageType.node:
logger.warning("不支持转发消息节点解析") logger.warning("不支持转发消息节点解析")
pass pass
case _:
logger.warning(f"未知消息类型:{sub_message_type}")
pass
return seg_message return seg_message
async def handle_text_message(self, raw_message: dict) -> Seg: async def handle_text_message(self, raw_message: dict) -> Seg:
@ -366,13 +379,16 @@ class RecvHandler:
else: else:
return None return None
async def handle_reply_message(self, raw_message: dict) -> None: async def handle_reply_message(self, raw_message: dict) -> Seg:
""" """
处理回复消息 处理回复消息
""" """
message_id = raw_message.get("data").get("id") message_id = raw_message.get("data").get("id")
message_detail: dict = await get_message_detail(self.server_connection, message_id) message_detail: dict = await get_message_detail(self.server_connection, message_id)
if not message_detail:
logger.warning("获取被引用的消息详情失败")
return None
reply_message: str = message_detail.get("raw_message") reply_message: str = message_detail.get("raw_message")
sender_info: dict = message_detail.get("sender") sender_info: dict = message_detail.get("sender")
sender_nickname: str = sender_info.get("nickname") sender_nickname: str = sender_info.get("nickname")

View File

@ -1,5 +1,6 @@
import json import json
import websockets.asyncio.server as Server import websockets as Server
import uuid
# from .config import global_config # from .config import global_config
# 白名单机制不启用 # 白名单机制不启用
@ -146,9 +147,17 @@ class SendHandler:
} }
async def send_message_to_napcat(self, action: str, params: dict) -> dict: async def send_message_to_napcat(self, action: str, params: dict) -> dict:
payload = json.dumps({"action": action, "params": params}) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": action, "params": params, "echo": request_uuid})
await self.server_connection.send(payload) await self.server_connection.send(payload)
response = await get_response() try:
response = await get_response(request_uuid)
except TimeoutError:
logger.error("发送消息超时,未收到响应")
return {"status": "error", "message": "timeout"}
except Exception as e:
logger.error(f"发送消息失败: {e}")
return {"status": "error", "message": str(e)}
return response return response

View File

@ -1,6 +1,7 @@
import websockets.asyncio.server as Server import websockets.asyncio.server as Server
import json import json
import base64 import base64
import uuid
from .logger import logger from .logger import logger
from .message_queue import get_response from .message_queue import get_response
@ -33,9 +34,17 @@ async def get_group_info(websocket: Server.ServerConnection, group_id: int) -> d
返回值需要处理可能为空的情况 返回值需要处理可能为空的情况
""" """
payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}}) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": "get_group_info", "params": {"group_id": group_id}, "echo": request_uuid})
await websocket.send(payload) await websocket.send(payload)
socket_response: dict = await get_response() try:
socket_response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error(f"获取群信息超时,群号: {group_id}")
return None
except Exception as e:
logger.error(f"获取群信息失败: {e}")
return None
logger.debug(socket_response) logger.debug(socket_response)
return socket_response.get("data") return socket_response.get("data")
@ -46,14 +55,23 @@ async def get_member_info(websocket: Server.ServerConnection, group_id: int, use
返回值需要处理可能为空的情况 返回值需要处理可能为空的情况
""" """
request_uuid = str(uuid.uuid4())
payload = json.dumps( payload = json.dumps(
{ {
"action": "get_group_member_info", "action": "get_group_member_info",
"params": {"group_id": group_id, "user_id": user_id, "no_cache": True}, "params": {"group_id": group_id, "user_id": user_id, "no_cache": True},
"echo": request_uuid,
} }
) )
await websocket.send(payload) await websocket.send(payload)
socket_response: dict = await get_response() try:
socket_response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error(f"获取成员信息超时,群号: {group_id}, 用户ID: {user_id}")
return None
except Exception as e:
logger.error(f"获取成员信息失败: {e}")
return None
logger.debug(socket_response) logger.debug(socket_response)
return socket_response.get("data") return socket_response.get("data")
@ -93,9 +111,17 @@ async def get_self_info(websocket: Server.ServerConnection) -> dict:
Returns: Returns:
data: dict: 返回的自身信息 data: dict: 返回的自身信息
""" """
payload = json.dumps({"action": "get_login_info", "params": {}}) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": "get_login_info", "params": {}, "echo": request_uuid})
await websocket.send(payload) await websocket.send(payload)
response: dict = await get_response() try:
response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error("获取自身信息超时")
return None
except Exception as e:
logger.error(f"获取自身信息失败: {e}")
return None
logger.debug(response) logger.debug(response)
return response.get("data") return response.get("data")
@ -121,24 +147,40 @@ async def get_stranger_info(websocket: Server.ServerConnection, user_id: int) ->
Returns: Returns:
dict: 返回的陌生人信息 dict: 返回的陌生人信息
""" """
payload = json.dumps({"action": "get_stranger_info", "params": {"user_id": user_id}}) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": "get_stranger_info", "params": {"user_id": user_id}, "echo": request_uuid})
await websocket.send(payload) await websocket.send(payload)
response: dict = await get_response() try:
response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error(f"获取陌生人信息超时用户ID: {user_id}")
return None
except Exception as e:
logger.error(f"获取陌生人信息失败: {e}")
return None
logger.debug(response) logger.debug(response)
return response.get("data") return response.get("data")
async def get_message_detail(websocket: Server.ServerConnection, message_id: str) -> dict: async def get_message_detail(websocket: Server.ServerConnection, message_id: str) -> dict:
""" """
获取消息详情 获取消息详情可能为空
Parameters: Parameters:
websocket: WebSocket连接对象 websocket: WebSocket连接对象
message_id: 消息ID message_id: 消息ID
Returns: Returns:
dict: 返回的消息详情 dict: 返回的消息详情
""" """
payload = json.dumps({"action": "get_msg", "params": {"message_id": message_id}}) request_uuid = str(uuid.uuid4())
payload = json.dumps({"action": "get_msg", "params": {"message_id": message_id}, "echo": request_uuid})
await websocket.send(payload) await websocket.send(payload)
response: dict = await get_response() try:
response: dict = await get_response(request_uuid)
except TimeoutError:
logger.error(f"获取消息详情超时消息ID: {message_id}")
return None
except Exception as e:
logger.error(f"获取消息详情失败: {e}")
return None
logger.debug(response) logger.debug(response)
return response.get("data") return response.get("data")