feat: 添加TTL清理不活跃stream

pull/1396/head
MySxan 2025-12-02 05:33:25 +00:00
parent a572ac5b0a
commit 940204c072
1 changed files with 24 additions and 0 deletions

View File

@ -23,6 +23,7 @@ logger = get_logger("chat_stream")
# LRU + TTL 配置 # LRU + TTL 配置
MAX_LAST_MESSAGES_CACHE = 5000 # 最大缓存消息数 MAX_LAST_MESSAGES_CACHE = 5000 # 最大缓存消息数
LAST_MESSAGE_TTL = 1800 # 消息缓存时间
class ChatMessageContext: class ChatMessageContext:
@ -145,6 +146,8 @@ class ChatManager:
# asyncio.create_task(self._initialize()) # asyncio.create_task(self._initialize())
# # 启动自动保存任务 # # 启动自动保存任务
# asyncio.create_task(self._auto_save_task()) # asyncio.create_task(self._auto_save_task())
# # 启动 TTL 清理任务
asyncio.create_task(self._cleanup_expired_messages())
async def _initialize(self): async def _initialize(self):
"""异步初始化""" """异步初始化"""
@ -164,6 +167,27 @@ class ChatManager:
except Exception as e: except Exception as e:
logger.error(f"聊天流自动保存失败: {str(e)}") logger.error(f"聊天流自动保存失败: {str(e)}")
async def _cleanup_expired_messages(self):
"""定期清理过期的 last_messages"""
while True:
await asyncio.sleep(300) # 每5分钟清理一次
try:
current_time = time.time()
expired_keys = []
for stream_id, timestamp in self.last_message_timestamps.items():
if current_time - timestamp > LAST_MESSAGE_TTL:
expired_keys.append(stream_id)
for key in expired_keys:
self.last_messages.pop(key, None)
self.last_message_timestamps.pop(key, None)
if expired_keys:
logger.info(f"清理了 {len(expired_keys)} 条过期的 last_messages")
except Exception as e:
logger.error(f"清理过期消息失败: {str(e)}")
def register_message(self, message: "MessageRecv"): def register_message(self, message: "MessageRecv"):
"""注册消息到聊天流""" """注册消息到聊天流"""
stream_id = self._generate_stream_id( stream_id = self._generate_stream_id(