diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index ac2d7a94..850a5033 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -23,6 +23,7 @@ logger = get_logger("chat_stream") # LRU + TTL 配置 MAX_LAST_MESSAGES_CACHE = 5000 # 最大缓存消息数 +LAST_MESSAGE_TTL = 1800 # 消息缓存时间 class ChatMessageContext: @@ -145,6 +146,8 @@ class ChatManager: # asyncio.create_task(self._initialize()) # # 启动自动保存任务 # asyncio.create_task(self._auto_save_task()) + # # 启动 TTL 清理任务 + asyncio.create_task(self._cleanup_expired_messages()) async def _initialize(self): """异步初始化""" @@ -164,6 +167,27 @@ class ChatManager: except Exception as e: logger.error(f"聊天流自动保存失败: {str(e)}") + async def _cleanup_expired_messages(self): + """定期清理过期的 last_messages""" + while True: + await asyncio.sleep(300) # 每5分钟清理一次 + try: + current_time = time.time() + expired_keys = [] + + for stream_id, timestamp in self.last_message_timestamps.items(): + if current_time - timestamp > LAST_MESSAGE_TTL: + expired_keys.append(stream_id) + + for key in expired_keys: + self.last_messages.pop(key, None) + self.last_message_timestamps.pop(key, None) + + if expired_keys: + logger.info(f"清理了 {len(expired_keys)} 条过期的 last_messages") + except Exception as e: + logger.error(f"清理过期消息失败: {str(e)}") + def register_message(self, message: "MessageRecv"): """注册消息到聊天流""" stream_id = self._generate_stream_id(