diff --git a/src/plugins/group_nickname/nickname_manager.py b/src/plugins/group_nickname/nickname_manager.py index b54c92b3..ac158a66 100644 --- a/src/plugins/group_nickname/nickname_manager.py +++ b/src/plugins/group_nickname/nickname_manager.py @@ -19,7 +19,8 @@ from src.plugins.chat.message import MessageRecv from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat logger = get_logger("NicknameManager") -logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger +logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger + def run_async_loop(loop: asyncio.AbstractEventLoop, coro): """ @@ -44,7 +45,9 @@ def run_async_loop(loop: asyncio.AbstractEventLoop, coro): # 1. 取消所有剩余任务 all_tasks = asyncio.all_tasks(loop) current_task = asyncio.current_task(loop) - tasks_to_cancel = [task for task in all_tasks if task is not current_task] # 避免取消 run_until_complete 本身 + tasks_to_cancel = [ + task for task in all_tasks if task is not current_task + ] # 避免取消 run_until_complete 本身 if tasks_to_cancel: logger_helper.info(f"Cancelling {len(tasks_to_cancel)} outstanding tasks in loop {id(loop)}...") for task in tasks_to_cancel: @@ -61,17 +64,19 @@ def run_async_loop(loop: asyncio.AbstractEventLoop, coro): # 3. 关闭循环 (如果未关闭) if not loop.is_closed(): # 在关闭前再运行一次以处理挂起的关闭回调 - loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器 + loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器 loop.close() logger_helper.info(f"Asyncio loop {id(loop)} closed.") except Exception as close_err: logger_helper.error(f"Error during asyncio loop cleanup for loop {id(loop)}: {close_err}", exc_info=True) + class NicknameManager: """ 管理群组绰号分析、处理、存储和使用的单例类。 封装了 LLM 调用、后台处理线程和数据库交互。 """ + _instance = None _lock = threading.Lock() @@ -89,11 +94,11 @@ class NicknameManager: 初始化 NicknameManager。 使用锁和标志确保实际初始化只执行一次。 """ - if hasattr(self, '_initialized') and self._initialized: + if hasattr(self, "_initialized") and self._initialized: return with self._lock: - if hasattr(self, '_initialized') and self._initialized: + if hasattr(self, "_initialized") and self._initialized: return logger.info("正在初始化 NicknameManager 组件...") @@ -101,7 +106,7 @@ class NicknameManager: self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING # 数据库处理器 - person_info_collection = getattr(db, 'person_info', None) + person_info_collection = getattr(db, "person_info", None) self.db_handler = NicknameDB(person_info_collection) if not self.db_handler.is_available(): logger.error("数据库处理器初始化失败,NicknameManager 功能受限。") @@ -136,9 +141,9 @@ class NicknameManager: self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100) # 使用 asyncio.Queue self.nickname_queue: asyncio.Queue = asyncio.Queue(maxsize=self.queue_max_size) - self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event,因为它是由另一个线程设置的 + self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event,因为它是由另一个线程设置的 self._nickname_thread: Optional[threading.Thread] = None - self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间 + self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间 self._initialized = True logger.info("NicknameManager 初始化完成。") @@ -152,8 +157,8 @@ class NicknameManager: logger.info("正在启动绰号处理器线程...") self._stop_event.clear() self._nickname_thread = threading.Thread( - target=self._run_processor_in_thread, # 线程目标函数不变 - daemon=True + target=self._run_processor_in_thread, # 线程目标函数不变 + daemon=True, ) self._nickname_thread.start() logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})") @@ -164,10 +169,10 @@ class NicknameManager: """停止后台处理线程。""" if self._nickname_thread and self._nickname_thread.is_alive(): logger.info("正在停止绰号处理器线程...") - self._stop_event.set() # 设置停止事件,_processing_loop 会检测到 + self._stop_event.set() # 设置停止事件,_processing_loop 会检测到 try: # 不需要清空 asyncio.Queue,让循环自然结束或被取消 - self._nickname_thread.join(timeout=10) # 等待线程结束 + self._nickname_thread.join(timeout=10) # 等待线程结束 if self._nickname_thread.is_alive(): logger.warning("绰号处理器线程在超时后仍未停止。") except Exception as e: @@ -209,8 +214,11 @@ class NicknameManager: # 格式化历史记录 chat_history_str = await build_readable_messages( messages=history_messages, - replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0, truncate=False, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, + truncate=False, ) # 2. 获取 Bot 回复 bot_reply_str = " ".join(bot_reply) if bot_reply else "" @@ -218,7 +226,9 @@ class NicknameManager: group_id = str(current_chat_stream.group_info.group_id) platform = current_chat_stream.platform # 4. 构建用户 ID 到名称的映射 (user_name_map) - user_ids_in_history = {str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")} + user_ids_in_history = { + str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id") + } user_name_map = {} if user_ids_in_history: try: @@ -231,14 +241,15 @@ class NicknameManager: user_name_map[user_id] = names_data[user_id] else: latest_nickname = next( - (m["user_info"].get("user_nickname") - for m in reversed(history_messages) - if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")), + ( + m["user_info"].get("user_nickname") + for m in reversed(history_messages) + if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") + ), None, ) user_name_map[user_id] = latest_nickname or f"未知({user_id})" - item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map) await self._add_to_queue(item, platform, group_id) @@ -256,7 +267,11 @@ class NicknameManager: try: group_id = str(chat_stream.group_info.group_id) platform = chat_stream.platform - user_ids_in_context = {str(msg["user_info"]["user_id"]) for msg in message_list_before_now if msg.get("user_info", {}).get("user_id")} + user_ids_in_context = { + str(msg["user_info"]["user_id"]) + for msg in message_list_before_now + if msg.get("user_info", {}).get("user_id") + } if not user_ids_in_context: recent_speakers = chat_stream.get_recent_speakers(limit=5) @@ -283,7 +298,6 @@ class NicknameManager: logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True) return "" - # 私有/内部方法 async def _add_to_queue(self, item: tuple, platform: str, group_id: str): @@ -291,15 +305,18 @@ class NicknameManager: try: # 使用 await put(),如果队列满则异步等待 await self.nickname_queue.put(item) - logger.debug(f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}") + logger.debug( + f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}" + ) except asyncio.QueueFull: # 理论上 await put() 不会直接抛 QueueFull,除非 maxsize=0 # 但保留以防万一或未来修改 - logger.warning(f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。") + logger.warning( + f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。" + ) except Exception as e: logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True) - async def _analyze_and_update_nicknames(self, item: tuple): """处理单个队列项目:调用 LLM 分析并更新数据库。""" if not isinstance(item, tuple) or len(item) != 5: @@ -309,7 +326,7 @@ class NicknameManager: chat_history_str, bot_reply, platform, group_id, user_name_map = item # 使用 asyncio.get_running_loop().call_soon(threading.get_ident) 可能不准确,线程ID是同步概念 # 可以考虑移除线程ID日志或寻找异步安全的获取标识符的方式 - log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀 + log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀 logger.debug(f"{log_prefix} 开始处理绰号分析任务...") if not self.llm_mapper: @@ -340,7 +357,9 @@ class NicknameManager: try: person_id = person_info_manager.get_person_id(platform, user_id_str) if not person_id: - logger.error(f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。") + logger.error( + f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。" + ) continue self.db_handler.upsert_person(person_id, user_id_int, platform) self.db_handler.update_group_nickname_count(person_id, group_id, nickname) @@ -353,7 +372,6 @@ class NicknameManager: else: logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。") - async def _call_llm_for_analysis( self, chat_history_str: str, @@ -387,7 +405,7 @@ class NicknameManager: elif response_content.startswith("{") and response_content.endswith("}"): pass # 可能是纯 JSON else: - json_match = re.search(r'\{.*\}', response_content, re.DOTALL) + json_match = re.search(r"\{.*\}", response_content, re.DOTALL) if json_match: response_content = json_match.group(0) else: @@ -430,11 +448,10 @@ class NicknameManager: logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True) return {"is_exist": False} - def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]: """过滤 LLM 返回的绰号映射结果。""" filtered_data = {} - bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, 'BOT_QQ') else None + bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, "BOT_QQ") else None for user_id, nickname in original_data.items(): if not isinstance(user_id, str): @@ -454,23 +471,22 @@ class NicknameManager: return filtered_data - # 线程相关 # 修改:使用 run_async_loop 辅助函数 def _run_processor_in_thread(self): """后台线程入口函数,使用辅助函数管理 asyncio 事件循环。""" - thread_id = threading.get_ident() # 获取线程ID用于日志 + thread_id = threading.get_ident() # 获取线程ID用于日志 logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...") loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) # 为当前线程设置事件循环 + asyncio.set_event_loop(loop) # 为当前线程设置事件循环 logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。") # 调用辅助函数来运行主处理协程并管理循环生命周期 run_async_loop(loop, self._processing_loop()) logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).") - # 结束修改 + # 结束修改 # 修改:使用 asyncio.Queue 和 wait_for async def _processing_loop(self): @@ -478,18 +494,15 @@ class NicknameManager: # 移除线程ID日志,因为它在异步上下文中不一定准确 logger.info("绰号异步处理循环已启动。") - while not self._stop_event.is_set(): # 仍然检查同步的停止事件 + while not self._stop_event.is_set(): # 仍然检查同步的停止事件 try: # 使用 asyncio.wait_for 从异步队列获取项目,并设置超时 - item = await asyncio.wait_for( - self.nickname_queue.get(), - timeout=self.sleep_interval - ) + item = await asyncio.wait_for(self.nickname_queue.get(), timeout=self.sleep_interval) # 处理获取到的项目 (调用异步方法) await self._analyze_and_update_nicknames(item) - self.nickname_queue.task_done() # 标记任务完成 + self.nickname_queue.task_done() # 标记任务完成 except asyncio.TimeoutError: # 等待超时,相当于之前 queue.Empty,继续循环检查停止事件 @@ -497,7 +510,7 @@ class NicknameManager: except asyncio.CancelledError: # 协程被取消 (通常在 stop_processor 中发生) logger.info("绰号处理循环被取消。") - break # 退出循环 + break # 退出循环 except Exception as e: # 捕获处理单个项目时可能发生的其他异常 logger.error(f"绰号处理循环出错: {e}", exc_info=True) @@ -507,6 +520,7 @@ class NicknameManager: logger.info("绰号异步处理循环已结束。") # 可以在这里添加清理逻辑,比如确保队列为空或处理剩余项目 # 例如:await self.nickname_queue.join() # 等待所有任务完成 (如果需要) + # 结束修改