import time import heapq import math import json from typing import List, Optional from src.common.logger import get_logger logger = get_logger("normal_chat") class PrioritizedMessage: """带有优先级的消息对象""" def __init__(self, message_data: dict, interest_scores: List[float], is_vip: bool = False): self.message_data = message_data self.arrival_time = time.time() self.interest_scores = interest_scores self.is_vip = is_vip self.priority = self.calculate_priority() def calculate_priority(self, decay_rate: float = 0.01) -> float: """ 计算优先级分数。 优先级 = 兴趣分 * exp(-衰减率 * 消息年龄) """ age = time.time() - self.arrival_time decay_factor = math.exp(-decay_rate * age) return sum(self.interest_scores) + decay_factor def __lt__(self, other: "PrioritizedMessage") -> bool: """用于堆排序的比较函数,我们想要一个最大堆,所以用 >""" return self.priority > other.priority class PriorityManager: """ 管理消息队列,根据优先级选择消息进行处理。 """ def __init__(self, normal_queue_max_size: int = 5): self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆) self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆) self.normal_queue_max_size = normal_queue_max_size def add_message(self, message_data: dict, interest_score: float = 0): """ 添加新消息到合适的队列中。 """ user_id = message_data.get("user_id") priority_info_raw = message_data.get("priority_info") priority_info = {} if isinstance(priority_info_raw, str): priority_info = json.loads(priority_info_raw) elif isinstance(priority_info_raw, dict): priority_info = priority_info_raw is_vip = priority_info.get("message_type") == "vip" message_priority = priority_info.get("message_priority", 0.0) p_message = PrioritizedMessage(message_data, [interest_score, message_priority], is_vip) if is_vip: heapq.heappush(self.vip_queue, p_message) logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}") else: if len(self.normal_queue) >= self.normal_queue_max_size: # 如果队列已满,只在消息优先级高于最低优先级消息时才添加 if p_message.priority > self.normal_queue[0].priority: heapq.heapreplace(self.normal_queue, p_message) logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}") else: logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}") else: heapq.heappush(self.normal_queue, p_message) logger.debug( f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}" ) def get_highest_priority_message(self) -> Optional[dict]: """ 从VIP和普通队列中获取当前最高优先级的消息。 """ # 更新所有消息的优先级 for p_msg in self.vip_queue: p_msg.priority = p_msg.calculate_priority() for p_msg in self.normal_queue: p_msg.priority = p_msg.calculate_priority() # 重建堆 heapq.heapify(self.vip_queue) heapq.heapify(self.normal_queue) vip_msg = self.vip_queue[0] if self.vip_queue else None normal_msg = self.normal_queue[0] if self.normal_queue else None if vip_msg: return heapq.heappop(self.vip_queue).message_data elif normal_msg: return heapq.heappop(self.normal_queue).message_data else: return None def is_empty(self) -> bool: """检查所有队列是否为空""" return not self.vip_queue and not self.normal_queue def get_queue_status(self) -> str: """获取队列状态信息""" return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"