From 612d4b1a7e0e4375eed017eedf482bfeed754f47 Mon Sep 17 00:00:00 2001 From: Bakadax Date: Sat, 3 May 2025 14:03:13 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E9=83=A8=E5=88=86=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E4=BB=A5=E4=BD=BF=E7=94=A8class=E5=B0=81=E8=A3=85?= =?UTF-8?q?=EF=BC=8C=E8=B0=83=E6=88=90=E9=BB=98=E8=AE=A4=E4=BC=91=E7=9C=A0?= =?UTF-8?q?=E9=97=B4=E9=9A=94=E4=B8=BA1=E5=88=86=E9=92=9F=EF=BC=8C?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BA=BF=E7=A8=8B=E5=90=AF=E5=8A=A8=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 27 +- src/config/config.py | 2 +- src/plugins/group_nickname/nickname_db.py | 156 +++++ .../group_nickname/nickname_manager.py | 534 ++++++++++++++++++ src/plugins/group_nickname/nickname_mapper.py | 210 ++----- .../group_nickname/nickname_processor.py | 346 ------------ src/plugins/group_nickname/nickname_utils.py | 279 ++------- src/plugins/heartFC_chat/heartFC_chat.py | 7 +- .../heartFC_chat/heartflow_prompt_builder.py | 6 +- src/plugins/heartFC_chat/normal_chat.py | 4 +- template/bot_config_template.toml | 2 +- template/lpmm_config_template.toml | 4 +- 12 files changed, 825 insertions(+), 752 deletions(-) create mode 100644 src/plugins/group_nickname/nickname_db.py create mode 100644 src/plugins/group_nickname/nickname_manager.py delete mode 100644 src/plugins/group_nickname/nickname_processor.py diff --git a/bot.py b/bot.py index f324a432..c1b3a253 100644 --- a/bot.py +++ b/bot.py @@ -14,10 +14,7 @@ from src.common.logger_manager import get_logger from src.common.crash_logger import install_crash_handler from src.main import MainSystem from rich.traceback import install -from src.plugins.group_nickname.nickname_processor import ( - start_nickname_processor, - stop_nickname_processor, -) +from src.plugins.group_nickname.nickname_manager import nickname_manager import atexit install(extra_lines=3) @@ -226,6 +223,19 @@ def raw_main(): env_config = {key: os.getenv(key) for key in os.environ} scan_provider(env_config) + # 确保 NicknameManager 单例实例存在并已初始化 + # (单例模式下,导入时或第一次调用时会自动初始化) + _ = nickname_manager # 显式引用一次 + + # 启动 NicknameManager 的后台处理器线程 + logger.info("准备启动绰号处理管理器...") + nickname_manager.start_processor() # 调用实例的方法 + logger.info("已调用启动绰号处理管理器。") + + # 注册 NicknameManager 的停止方法到 atexit,确保程序退出时线程能被清理 + atexit.register(nickname_manager.stop_processor) # 注册实例的方法 + logger.info("已注册绰号处理管理器的退出处理程序。") + # 返回MainSystem实例 return MainSystem() @@ -235,15 +245,6 @@ if __name__ == "__main__": # 获取MainSystem实例 main_system = raw_main() - # 在这里启动绰号处理进程 - logger.info("准备启动绰号处理线程...") - start_nickname_processor() # <--- 添加启动调用 - logger.info("已调用启动绰号处理线程。") - - # 注册退出处理函数 (确保进程能被关闭) - atexit.register(stop_nickname_processor) # <--- 在这里注册停止函数 - logger.info("已注册绰号处理线程的退出处理程序。") - # 创建事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/src/config/config.py b/src/config/config.py index 3c90c201..312f3e95 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -279,7 +279,7 @@ class BotConfig: MAX_NICKNAMES_IN_PROMPT: int = 10 # Prompt 中最多注入的绰号数量 NICKNAME_PROBABILITY_SMOOTHING: int = 1 # 绰号加权随机选择的平滑因子 NICKNAME_QUEUE_MAX_SIZE: int = 100 # 绰号处理队列最大容量 - NICKNAME_PROCESS_SLEEP_INTERVAL: float = 5 # 绰号处理进程休眠间隔(秒) + NICKNAME_PROCESS_SLEEP_INTERVAL: float = 60 # 绰号处理进程休眠间隔(秒) # 模型配置 llm_reasoning: dict[str, str] = field(default_factory=lambda: {}) diff --git a/src/plugins/group_nickname/nickname_db.py b/src/plugins/group_nickname/nickname_db.py new file mode 100644 index 00000000..d0c6d975 --- /dev/null +++ b/src/plugins/group_nickname/nickname_db.py @@ -0,0 +1,156 @@ +from pymongo.collection import Collection +from pymongo.errors import OperationFailure, DuplicateKeyError +from src.common.logger_manager import get_logger +from typing import Optional + +logger = get_logger("nickname_db") + +class NicknameDB: + """ + 处理与群组绰号相关的数据库操作 (MongoDB)。 + 封装了对 'person_info' 集合的读写操作。 + """ + def __init__(self, person_info_collection: Optional[Collection]): + """ + 初始化 NicknameDB 处理器。 + + Args: + person_info_collection: MongoDB 'person_info' 集合对象。 + 如果为 None,则数据库操作将被禁用。 + """ + if person_info_collection is None: + logger.error("未提供 person_info 集合,NicknameDB 操作将被禁用。") + self.person_info_collection = None + else: + self.person_info_collection = person_info_collection + logger.info("NicknameDB 初始化成功。") + + def is_available(self) -> bool: + """检查数据库集合是否可用。""" + return self.person_info_collection is not None + + def upsert_person(self, person_id: str, user_id_int: int, platform: str): + """ + 确保数据库中存在指定 person_id 的文档 (Upsert)。 + 如果文档不存在,则使用提供的用户信息创建它。 + + Args: + person_id: 要查找或创建的 person_id。 + user_id_int: 用户的整数 ID。 + platform: 平台名称。 + + Returns: + UpdateResult 或 None: MongoDB 更新操作的结果,如果数据库不可用则返回 None。 + + Raises: + DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)。 + Exception: 其他数据库操作错误。 + """ + if not self.is_available(): + logger.error("数据库集合不可用,无法执行 upsert_person。") + return None + try: + # 关键步骤:基于 person_id 执行 Upsert + result = self.person_info_collection.update_one( + {"person_id": person_id}, + { + "$setOnInsert": { + "person_id": person_id, + "user_id": user_id_int, + "platform": platform, + "group_nicknames": [], # 初始化 group_nicknames 数组 + } + }, + upsert=True, + ) + if result.upserted_id: + logger.debug(f"Upsert 创建了新的 person 文档: {person_id}") + return result + except DuplicateKeyError as dk_err: + # 这个错误理论上不应该再由 upsert 触发。 + logger.error( + f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" + ) + raise # 将异常向上抛出 + except Exception as e: + logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}") + raise # 将异常向上抛出 + + def update_group_nickname_count(self, person_id: str, group_id_str: str, nickname: str): + """ + 尝试更新 person_id 文档中特定群组的绰号计数,或添加新条目。 + 按顺序尝试:增加计数 -> 添加绰号 -> 添加群组。 + + Args: + person_id: 目标文档的 person_id。 + group_id_str: 目标群组的 ID (字符串)。 + nickname: 要更新或添加的绰号。 + """ + if not self.is_available(): + logger.error("数据库集合不可用,无法执行 update_group_nickname_count。") + return + + try: + # 3a. 尝试增加现有群组中现有绰号的计数 + result_inc = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}, + }, + {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, + array_filters=[ + {"group.group_id": group_id_str}, + {"nick.name": nickname}, + ], + ) + if result_inc.modified_count > 0: + # logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。") + return # 成功增加计数,操作完成 + + # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 + result_push_nick = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": group_id_str, # 检查群组是否存在 + }, + {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, + array_filters=[{"group.group_id": group_id_str}], + ) + if result_push_nick.modified_count > 0: + logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'。") + return # 成功添加绰号,操作完成 + + # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 + # 确保 group_nicknames 数组存在 (作为保险措施) + self.person_info_collection.update_one( + {"person_id": person_id, "group_nicknames": {"$exists": False}}, + {"$set": {"group_nicknames": []}}, + ) + # 推送新的群组对象到 group_nicknames 数组 + result_push_group = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 + }, + { + "$push": { + "group_nicknames": { + "group_id": group_id_str, + "nicknames": [{"name": nickname, "count": 1}], + } + } + }, + ) + if result_push_group.modified_count > 0: + logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'。") + # else: + # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") + + except (OperationFailure, DuplicateKeyError) as db_err: + logger.exception( + f"数据库操作失败 ({type(db_err).__name__}): person_id {person_id}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}" + ) + # 根据需要决定是否向上抛出 raise db_err + except Exception as e: + logger.exception(f"更新群组绰号计数时发生意外错误: person_id {person_id}, group {group_id_str}, nick {nickname}. Error: {e}") + # 根据需要决定是否向上抛出 raise e \ No newline at end of file diff --git a/src/plugins/group_nickname/nickname_manager.py b/src/plugins/group_nickname/nickname_manager.py new file mode 100644 index 00000000..3d47ad1d --- /dev/null +++ b/src/plugins/group_nickname/nickname_manager.py @@ -0,0 +1,534 @@ +import asyncio +import threading +import queue +import traceback +import time +import json +import re +from typing import Dict, Optional, List, Any + +from pymongo.errors import OperationFailure, DuplicateKeyError +from src.common.logger_manager import get_logger +from src.common.database import db +from src.config.config import global_config +from src.plugins.models.utils_model import LLMRequest +from .nickname_db import NicknameDB +from .nickname_mapper import _build_mapping_prompt +from .nickname_utils import select_nicknames_for_prompt, format_nickname_prompt_injection + +# 依赖于 person_info_manager 来生成 person_id +from ..person_info.person_info import person_info_manager +# 依赖于 relationship_manager 来获取用户名称和现有绰号 +from ..person_info.relationship_manager import relationship_manager + +# 导入消息和聊天流相关的类型和工具 +from src.plugins.chat.chat_stream import ChatStream +from src.plugins.chat.message import MessageRecv +from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat + +logger = get_logger("NicknameManager") + +class NicknameManager: + """ + 管理群组绰号分析、处理、存储和使用的单例类。 + 封装了 LLM 调用、后台处理线程和数据库交互。 + """ + _instance = None + _lock = threading.Lock() + + # Singleton Implementation + def __new__(cls, *args, **kwargs): + if not cls._instance: + with cls._lock: + # 再次检查,防止多线程并发创建实例 + if not cls._instance: + logger.info("正在创建 NicknameManager 单例实例...") + cls._instance = super(NicknameManager, cls).__new__(cls) + cls._instance._initialized = False # 添加初始化标志 + return cls._instance + + def __init__(self): + """ + 初始化 NicknameManager。 + 使用锁和标志确保实际初始化只执行一次。 + """ + if self._initialized: # 如果已初始化,直接返回 + return + + with self._lock: + # 再次检查初始化标志,防止重复初始化 + if self._initialized: + return + + logger.info("正在初始化 NicknameManager 组件...") + self.config = global_config + self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING + + # 数据库处理器 + person_info_collection = getattr(db, 'person_info', None) + self.db_handler = NicknameDB(person_info_collection) + if not self.db_handler.is_available(): + logger.error("数据库处理器初始化失败,NicknameManager 功能受限。") + self.is_enabled = False # 如果数据库不可用,禁用功能 + + # LLM 映射器 + self.llm_mapper: Optional[LLMRequest] = None + if self.is_enabled: + try: + model_config = self.config.llm_nickname_mapping + if model_config and model_config.get("name"): + self.llm_mapper = LLMRequest( + model=model_config, + temperature=model_config.get("temp", 0.5), # 使用 get 获取并提供默认值 + max_tokens=model_config.get("max_tokens", 256), # 使用 get 获取并提供默认值 + request_type="nickname_mapping", + ) + logger.info("绰号映射 LLM 映射器初始化成功。") + else: + logger.warning("绰号映射 LLM 配置无效或缺失 'name',功能禁用。") + self.is_enabled = False + except KeyError as ke: + logger.error(f"初始化绰号映射 LLM 时缺少配置项: {ke},功能禁用。", exc_info=True) + self.llm_mapper = None + self.is_enabled = False + except Exception as e: + logger.error(f"初始化绰号映射 LLM 映射器失败: {e},功能禁用。", exc_info=True) + self.llm_mapper = None + self.is_enabled = False + + # 队列和线程 + self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100) + self.nickname_queue: queue.Queue = queue.Queue(maxsize=self.queue_max_size) + self._stop_event = threading.Event() + self._nickname_thread: Optional[threading.Thread] = None + self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) + + self._initialized = True # 标记为已初始化 + logger.info("NicknameManager 初始化完成。") + + # 公共方法 + + def start_processor(self): + """启动后台处理线程(如果已启用且未运行)。""" + if not self.is_enabled: + logger.info("绰号处理功能已禁用,处理器未启动。") + return + if self._nickname_thread is None or not self._nickname_thread.is_alive(): + logger.info("正在启动绰号处理器线程...") + self._stop_event.clear() # 清除停止事件标志 + self._nickname_thread = threading.Thread( + target=self._run_processor_in_thread, # 线程执行的入口函数 + daemon=True # 设置为守护线程,主程序退出时自动结束 + ) + self._nickname_thread.start() + logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})") + else: + logger.warning("绰号处理器线程已在运行中。") + + def stop_processor(self): + """停止后台处理线程。""" + if self._nickname_thread and self._nickname_thread.is_alive(): + logger.info("正在停止绰号处理器线程...") + self._stop_event.set() # 设置停止事件标志 + try: + # 可选:尝试清空队列,避免丢失未处理的任务 + # while not self.nickname_queue.empty(): + # try: + # self.nickname_queue.get_nowait() + # self.nickname_queue.task_done() + # except queue.Empty: + # break + # logger.info("绰号处理队列已清空。") + + self._nickname_thread.join(timeout=10) # 等待线程结束,设置超时 + if self._nickname_thread.is_alive(): + logger.warning("绰号处理器线程在超时后仍未停止。") + except Exception as e: + logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True) + finally: + if self._nickname_thread and not self._nickname_thread.is_alive(): + logger.info("绰号处理器线程已成功停止。") + self._nickname_thread = None # 清理线程对象引用 + else: + logger.info("绰号处理器线程未在运行或已被清理。") + + async def trigger_nickname_analysis( + self, + anchor_message: MessageRecv, + bot_reply: List[str], + chat_stream: Optional[ChatStream] = None, + ): + """ + 准备数据并将其排队等待绰号分析(如果满足条件)。 + 取代了旧的 trigger_nickname_analysis_if_needed 函数。 + """ + if not self.is_enabled: + return # 功能禁用则直接返回 + + current_chat_stream = chat_stream or anchor_message.chat_stream + if not current_chat_stream or not current_chat_stream.group_info: + logger.debug("跳过绰号分析:非群聊或无效的聊天流。") + return + + log_prefix = f"[{current_chat_stream.stream_id}]" + try: + # 1. 获取历史记录 + history_limit = getattr(self.config, "NICKNAME_ANALYSIS_HISTORY_LIMIT", 30) + history_messages = get_raw_msg_before_timestamp_with_chat( + chat_id=current_chat_stream.stream_id, + timestamp=time.time(), + limit=history_limit, + ) + + # 格式化历史记录 + chat_history_str = await build_readable_messages( + messages=history_messages, + replace_bot_name=True, merge_messages=False, timestamp_mode="relative", + read_mark=0.0, truncate=False, + ) + + # 2. 获取 Bot 回复 + bot_reply_str = " ".join(bot_reply) if bot_reply else "" + + # 3. 获取群组和平台信息 + group_id = str(current_chat_stream.group_info.group_id) + platform = current_chat_stream.platform + + # 4. 构建用户 ID 到名称的映射 (user_name_map) + user_ids_in_history = {str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")} + user_name_map = {} + if user_ids_in_history: + try: + # 使用 relationship_manager 批量获取名称 + names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history)) + except Exception as e: + logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True) + names_data = {} + + # 填充 user_name_map + for user_id in user_ids_in_history: + if user_id in names_data: + user_name_map[user_id] = names_data[user_id] + else: + # 回退查找历史记录中的 nickname + latest_nickname = next( + (m["user_info"].get("user_nickname") + for m in reversed(history_messages) + if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")), + None, + ) + user_name_map[user_id] = latest_nickname or f"未知({user_id})" + + # 5. 添加到内部处理队列 + item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map) + self._add_to_queue(item, platform, group_id) # 调用私有方法入队 + + except Exception as e: + logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) + + + async def get_nickname_prompt_injection(self, chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str: + """ + 获取并格式化用于 Prompt 注入的绰号信息字符串。 + 取代了旧的 get_nickname_injection_for_prompt 函数。 + """ + if not self.is_enabled or not chat_stream or not chat_stream.group_info: + return "" # 功能禁用或非群聊则返回空 + + log_prefix = f"[{chat_stream.stream_id}]" + try: + group_id = str(chat_stream.group_info.group_id) + platform = chat_stream.platform + + # 确定上下文中的用户 ID + user_ids_in_context = {str(msg["user_info"]["user_id"]) for msg in message_list_before_now if msg.get("user_info", {}).get("user_id")} + + # 如果消息列表为空,尝试获取最近发言者 + if not user_ids_in_context: + recent_speakers = chat_stream.get_recent_speakers(limit=5) + user_ids_in_context.update(str(speaker["user_id"]) for speaker in recent_speakers) + + if not user_ids_in_context: + logger.warning(f"{log_prefix} 未找到上下文用户用于绰号注入。") + return "" + + # 使用 relationship_manager 批量获取这些用户的群组绰号 + all_nicknames_data = await relationship_manager.get_users_group_nicknames( + platform, list(user_ids_in_context), group_id + ) + + if all_nicknames_data: + # 使用 nickname_utils 中的工具函数进行选择和格式化 + selected_nicknames = select_nicknames_for_prompt(all_nicknames_data) + injection_str = format_nickname_prompt_injection(selected_nicknames) + if injection_str: + logger.debug(f"{log_prefix} 生成的绰号 Prompt 注入:\n{injection_str}") + return injection_str + else: + return "" # 没有获取到绰号数据 + + except Exception as e: + logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True) + return "" # 出错时返回空 + + + # 私有/内部方法 + + def _add_to_queue(self, item: tuple, platform: str, group_id: str): + """将项目添加到内部处理队列。""" + try: + self.nickname_queue.put_nowait(item) + logger.debug(f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}") + except queue.Full: + logger.warning(f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。") + except Exception as e: + logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True) + + + async def _analyze_and_update_nicknames(self, item: tuple): + """处理单个队列项目:调用 LLM 分析并更新数据库。""" + if not isinstance(item, tuple) or len(item) != 5: + logger.warning(f"从队列接收到无效项目: {type(item)}") + return + + chat_history_str, bot_reply, platform, group_id, user_name_map = item + thread_id = threading.get_ident() + log_prefix = f"[线程 {thread_id}][{platform}:{group_id}]" + logger.debug(f"{log_prefix} 开始处理绰号分析任务...") + + if not self.llm_mapper: + logger.error(f"{log_prefix} LLM 映射器不可用,无法执行分析。") + return + if not self.db_handler.is_available(): + logger.error(f"{log_prefix} 数据库处理器不可用,无法更新计数。") + return + + # 1. 调用 LLM 分析 (逻辑从 nickname_mapper 移入) + analysis_result = await self._call_llm_for_analysis(chat_history_str, bot_reply, user_name_map) + + # 2. 如果分析成功且找到映射,则更新数据库 + if analysis_result.get("is_exist") and analysis_result.get("data"): + nickname_map_to_update = analysis_result["data"] + logger.info(f"{log_prefix} LLM 找到绰号映射,准备更新数据库: {nickname_map_to_update}") + + for user_id_str, nickname in nickname_map_to_update.items(): + # 基本验证 + if not user_id_str or not nickname: + logger.warning(f"{log_prefix} 跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'") + continue + if not user_id_str.isdigit(): + logger.warning(f"{log_prefix} 无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。") + continue + user_id_int = int(user_id_str) + # 结束验证 + + try: + # 步骤 1: 生成 person_id + person_id = person_info_manager.get_person_id(platform, user_id_str) + if not person_id: + logger.error(f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。") + continue + + # 步骤 2: 确保 Person 文档存在 (调用 DB Handler) + self.db_handler.upsert_person(person_id, user_id_int, platform) + + # 步骤 3: 更新群组绰号 (调用 DB Handler) + self.db_handler.update_group_nickname_count(person_id, group_id, nickname) + + except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误 + logger.exception( + f"{log_prefix} 数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 绰号 {nickname}. 错误: {db_err}" + ) + except Exception as e: + logger.exception(f"{log_prefix} 处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") + else: + logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。") + + + async def _call_llm_for_analysis( + self, + chat_history_str: str, + bot_reply: str, + user_name_map: Dict[str, str], + ) -> Dict[str, Any]: + """ + 内部方法:调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射。 + (逻辑从 analyze_chat_for_nicknames 移入) + """ + if not self.llm_mapper: # 再次检查 LLM 映射器 + logger.error("LLM 映射器未初始化,无法执行分析。") + return {"is_exist": False} + + prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map) + logger.debug(f"构建的绰号映射 Prompt:\n{prompt[:500]}...") # 截断日志输出 + + try: + # 调用 LLM + response_content, _, _ = await self.llm_mapper.generate_response(prompt) + logger.debug(f"LLM 原始响应 (绰号映射): {response_content}") + + if not response_content: + logger.warning("LLM 返回了空的绰号映射内容。") + return {"is_exist": False} + + # 清理可能的 Markdown 代码块标记 + response_content = response_content.strip() + markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL | re.IGNORECASE) + match = markdown_code_regex.match(response_content) + if match: + response_content = match.group(1).strip() + # 尝试直接解析 JSON,即使没有代码块标记 + elif response_content.startswith("{") and response_content.endswith("}"): + pass # 可能是纯 JSON + else: + # 尝试在文本中查找 JSON 对象 + json_match = re.search(r'\{.*\}', response_content, re.DOTALL) + if json_match: + response_content = json_match.group(0) + else: + logger.warning(f"LLM 响应似乎不包含有效的 JSON 对象。响应: {response_content}") + return {"is_exist": False} + + + # 解析 JSON + result = json.loads(response_content) + + # 结果验证和过滤 + if not isinstance(result, dict): + logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}") + return {"is_exist": False} + + is_exist = result.get("is_exist") + + if is_exist is True: + original_data = result.get("data") + if isinstance(original_data, dict) and original_data: + logger.info(f"LLM 找到的原始绰号映射: {original_data}") + filtered_data = self._filter_llm_results(original_data, user_name_map) # 调用过滤函数 + if not filtered_data: + logger.info("所有找到的绰号映射都被过滤掉了。") + return {"is_exist": False} + else: + logger.info(f"过滤后的绰号映射: {filtered_data}") + return {"is_exist": True, "data": filtered_data} + else: + # is_exist 为 True 但 data 缺失、不是字典或为空 + logger.warning(f"LLM 响应格式错误: is_exist=True 但 data 无效。原始 data: {original_data}") + return {"is_exist": False} + elif is_exist is False: + logger.info("LLM 明确指示未找到可靠的绰号映射 (is_exist=False)。") + return {"is_exist": False} + else: # is_exist 不是 True 或 False (包括 None) + logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 无效。") + return {"is_exist": False} + + except json.JSONDecodeError as json_err: + logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}") + return {"is_exist": False} + except Exception as e: + logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True) + return {"is_exist": False} + + def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]: + """过滤 LLM 返回的绰号映射结果。""" + filtered_data = {} + bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, 'BOT_QQ') else None + + for user_id, nickname in original_data.items(): + # 过滤条件 1: user_id 必须是字符串 + if not isinstance(user_id, str): + logger.warning(f"过滤掉非字符串 user_id: {user_id}") + continue + # 过滤条件 2: 排除机器人自身 + if bot_qq_str and user_id == bot_qq_str: + logger.debug(f"过滤掉机器人自身的映射: ID {user_id}") + continue + # 过滤条件 3: 排除 nickname 为空或仅包含空白的情况 + if not nickname or nickname.isspace(): + logger.debug(f"过滤掉用户 {user_id} 的空绰号。") + continue + + # 过滤条件 4 (可选,根据 Prompt 效果决定是否保留): 排除 nickname 与已知名称相同的情况 + # person_name = user_name_map.get(user_id) + # if person_name and person_name == nickname: + # logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。") + # continue + + # 如果通过所有过滤条件,则保留 + filtered_data[user_id] = nickname.strip() # 保留时去除首尾空白 + + return filtered_data + + + # 线程相关 + def _run_processor_in_thread(self): + """后台线程的入口函数,负责创建和运行 asyncio 事件循环。""" + loop = None + thread_id = threading.get_ident() + logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...") + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。") + # 运行主处理循环直到停止事件被设置 + loop.run_until_complete(self._processing_loop()) + except Exception as e: + logger.error(f"(线程 ID: {thread_id}) 运行绰号处理器线程时出错: {e}", exc_info=True) + finally: + # 确保循环被正确关闭 + if loop: + try: + if loop.is_running(): + logger.info(f"(线程 ID: {thread_id}) 正在停止 asyncio 循环...") + all_tasks = asyncio.all_tasks(loop) + if all_tasks: + logger.info(f"(线程 ID: {thread_id}) 正在取消 {len(all_tasks)} 个运行中的任务...") + for task in all_tasks: + task.cancel() + # 等待任务取消完成 + loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True)) + logger.info(f"(线程 ID: {thread_id}) 所有任务已取消。") + loop.stop() + logger.info(f"(线程 ID: {thread_id}) 循环已停止。") + if not loop.is_closed(): + loop.close() + logger.info(f"(线程 ID: {thread_id}) Asyncio 循环已关闭。") + except Exception as loop_close_err: + logger.error(f"(线程 ID: {thread_id}) 关闭循环时出错: {loop_close_err}", exc_info=True) + logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).") + + + async def _processing_loop(self): + """后台线程中运行的异步处理循环。""" + thread_id = threading.get_ident() + logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。") + + while not self._stop_event.is_set(): + try: + # 从队列中获取项目,设置超时以允许检查停止事件 + item = self.nickname_queue.get(block=True, timeout=self.sleep_interval) + + # 处理获取到的项目 + await self._analyze_and_update_nicknames(item) + + self.nickname_queue.task_done() # 标记任务完成 + + except queue.Empty: + # 超时,队列为空,继续循环检查停止事件 + continue + except asyncio.CancelledError: + logger.info(f"绰号处理循环被取消 (线程 ID: {thread_id})。") + break # 任务被取消,退出循环 + except Exception as e: + # 捕获处理单个项目时可能发生的其他异常 + logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}") + # 可以在这里添加错误处理逻辑,例如将失败的任务放回队列或记录到错误日志 + # 短暂休眠避免快速连续失败 + await asyncio.sleep(5) + + logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。") + + +# 在模块级别创建单例实例 +# 这使得其他模块可以通过 `from .nickname_manager import nickname_manager` 来导入和使用 +nickname_manager = NicknameManager() diff --git a/src/plugins/group_nickname/nickname_mapper.py b/src/plugins/group_nickname/nickname_mapper.py index 20753559..aa86ef14 100644 --- a/src/plugins/group_nickname/nickname_mapper.py +++ b/src/plugins/group_nickname/nickname_mapper.py @@ -1,44 +1,37 @@ -import re -import json -from typing import Dict, Any, Optional +# src/plugins/group_nickname/nickname_mapper.py +from typing import Dict from src.common.logger_manager import get_logger -from src.plugins.models.utils_model import LLMRequest -from src.config.config import global_config +# 这个文件现在只负责构建 Prompt,LLM 的初始化和调用移至 NicknameManager logger = get_logger("nickname_mapper") -llm_mapper: Optional[LLMRequest] = None -if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 - try: - # 从全局配置获取模型设置 - model_config = global_config.llm_nickname_mapping - if not model_config or not model_config.get("name"): - logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。") - else: - llm_mapper = LLMRequest( # <-- LLM 初始化 - model=global_config.llm_nickname_mapping, - temperature=global_config.llm_nickname_mapping["temp"], - max_tokens=256, - request_type="nickname_mapping", - ) - logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。") - - except Exception as e: - logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True) - llm_mapper = None - +# LLMRequest 实例和 analyze_chat_for_nicknames 函数已被移除 def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str: - """构建用于 LLM 绰号映射的 Prompt""" - # user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射 - user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()]) - # print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print + """ + 构建用于 LLM 进行绰号映射分析的 Prompt。 + + Args: + chat_history_str: 格式化后的聊天历史记录字符串。 + bot_reply: Bot 的最新回复字符串。 + user_name_map: 用户 ID 到已知名称(person_name 或 fallback nickname)的映射。 + + Returns: + str: 构建好的 Prompt 字符串。 + """ + # 将 user_name_map 格式化为列表字符串 + user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items() if uid and name]) + if not user_list_str: + user_list_str = "无" # 如果映射为空,明确告知 + + # 核心 Prompt 内容 prompt = f""" -任务:分析以下聊天记录和你的最新回复,判断其中是否包含用户绰号,并确定绰号与用户 ID 之间是否存在明确的一一对应关系。 +任务:仔细分析以下聊天记录和“你的最新回复”,判断其中是否明确提到了某个用户的绰号,并且这个绰号可以清晰地与一个特定的用户 ID 对应起来。 已知用户信息(ID: 名称): {user_list_str} +*注意:名称后面带有"(你)"表示是你自己。* 聊天记录: --- @@ -48,139 +41,36 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: 你的最新回复: {bot_reply} -分析要求: -1. 识别聊天记录和你发言中出现的可能是用户绰号的词语。 -2. 判断这些绰号是否能明确地指向某个特定的用户 ID。一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来。 -3. 如果能建立可靠的一一映射关系,请输出一个 JSON 对象,格式如下: - {{ - "is_exist": true, - "data": {{ - "用户A数字id": "绰号_A", - "用户B数字id": "绰号_B" +分析要求与输出格式: +1. 找出聊天记录和“你的最新回复”中可能是用户绰号的词语。 +2. 判断这些绰号是否在上下文中**清晰、无歧义**地指向了“已知用户信息”列表中的**某一个特定用户 ID**。必须是强关联,避免猜测。 +3. **不要**输出你自己(名称后带"(你)"的用户)的绰号映射。 + **不要**输出与用户已知名称完全相同的词语作为绰号。 + **不要**将在“你的最新回复”中你对他人使用的称呼或绰号进行映射(只分析聊天记录中他人对用户的称呼)。 + **不要**输出指代不明或过于通用的词语(如“大佬”、“兄弟”、“那个谁”等,除非上下文能非常明确地指向特定用户)。 +4. 如果找到了**至少一个**满足上述所有条件的**明确**的用户 ID 到绰号的映射关系,请输出 JSON 对象: + ```json + {{ + "is_exist": true, + "data": {{ + "用户A数字id": "绰号_A", + "用户B数字id": "绰号_B" + }} }} - }} - 其中 "data" 字段的键是用户的 ID (字符串形式),值是对应的绰号。只包含你能确认映射关系的绰号。 -4. 如果无法建立任何可靠的一一映射关系(例如,绰号指代不明、没有出现绰号、或无法确认绰号与用户的关联),请输出 JSON 对象: - {{ - "is_exist": false - }} -5. 在“已知用户信息”列表中,你的昵称后面可能包含"(你)",这表示是你自己,不需要输出你自身的绰号映射。请确保不要将你自己的ID和任何词语映射为绰号。 -6. 不要输出与用户名称相同的绰号,不要输出你发言中对他人的绰号映射。 -7. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。 + ``` + - `"data"` 字段的键必须是用户的**数字 ID (字符串形式)**,值是对应的**绰号 (字符串形式)**。 + - 只包含你能**百分百确认**映射关系的条目。宁缺毋滥。 + 如果**无法找到任何一个**满足条件的明确映射关系,请输出 JSON 对象: + ```json + {{ + "is_exist": false + }} + ``` +5. 请**仅**输出 JSON 对象,不要包含任何额外的解释、注释或代码块标记之外的文本。 输出: """ + # logger.debug(f"构建的绰号映射 Prompt (部分):\n{prompt[:500]}...") # 可以在 NicknameManager 中记录 return prompt - -async def analyze_chat_for_nicknames( - chat_history_str: str, - bot_reply: str, - user_name_map: Dict[str, str], # 这个 map 包含了 user_id -> person_name 的信息 -) -> Dict[str, Any]: - """ - 调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。 - """ - if not global_config.ENABLE_NICKNAME_MAPPING: - logger.debug("绰号映射功能已禁用。") - return {"is_exist": False} - - if llm_mapper is None: - logger.error("绰号映射 LLM 未初始化。无法执行分析。") - return {"is_exist": False} - - prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map) - logger.debug(f"构建的绰号映射 Prompt:\n{prompt}") - - try: - # 调用 LLM - response_content, _, _ = await llm_mapper.generate_response(prompt) - logger.debug(f"LLM 原始响应 (绰号映射): {response_content}") - - if not response_content: - logger.warning("LLM 返回了空的绰号映射内容。") - return {"is_exist": False} - - # 清理可能的 Markdown 代码块标记 - response_content = response_content.strip() - markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL) - match = markdown_code_regex.match(response_content) - if match: - response_content = match.group(1).strip() - - # 解析 JSON - result = json.loads(response_content) # 可能抛出 json.JSONDecodeError - - # 检查 result 是否为字典 - if not isinstance(result, dict): - logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}") - return {"is_exist": False} - - # 使用 get 获取 is_exist,避免 KeyError - is_exist = result.get("is_exist") # 如果 result 不是字典,下面 get 会在 except AttributeError 中捕获 - - if is_exist is True: - original_data = result.get("data") - if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典 - logger.info(f"LLM 找到的原始绰号映射: {original_data}") - - # --- 开始过滤 --- - filtered_data = {} - bot_qq_str = str(global_config.BOT_QQ) - - for user_id, nickname in original_data.items(): - if not isinstance(user_id, str): - logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。") - continue - if user_id == bot_qq_str: - logger.debug(f"过滤掉机器人自身的映射: ID {user_id}") - continue - - # 有了改名工具后,该过滤器已不适合了,尝试通过修改 prompt 获得更好的结果 - # # 条件 2: 排除 nickname 与 person_name 相同的情况 - # person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name - # if person_name and person_name == nickname: - # logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。") - # continue - - # 如果通过所有过滤条件,则保留 - filtered_data[user_id] = nickname - - # 检查过滤后是否还有数据 - if not filtered_data: - logger.info("所有找到的绰号映射都被过滤掉了。") - return {"is_exist": False} - else: - logger.info(f"过滤后的绰号映射: {filtered_data}") - return {"is_exist": True, "data": filtered_data} - else: - # is_exist 为 True 但 data 缺失、不是字典或为空 - if "data" not in result: - logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。") - elif not isinstance(original_data, dict): - logger.warning( - f"LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。 原始 data: {original_data}" - ) - else: # data 为空字典 - logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。") - return {"is_exist": False} - - elif is_exist is False: - logger.info("LLM 未找到可靠的绰号映射。") - return {"is_exist": False} - - elif is_exist is None: # 处理 is_exist 键存在但值为 null/None 的情况 - logger.warning("LLM 响应格式错误: 'is_exist' 键的值为 None。") - return {"is_exist": False} - - else: # 处理 is_exist 存在但值不是 True/False/None 的情况 - logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 不是预期的布尔值或 None。") - return {"is_exist": False} - - except json.JSONDecodeError as json_err: - logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}") - return {"is_exist": False} - except Exception as e: - # 捕获其他所有未预料到的异常 - logger.error(f"绰号映射 LLM 调用或处理过程中发生未预料的错误: {e}", exc_info=True) - return {"is_exist": False} +# analyze_chat_for_nicknames 函数已被移除,其逻辑移至 NicknameManager._call_llm_for_analysis diff --git a/src/plugins/group_nickname/nickname_processor.py b/src/plugins/group_nickname/nickname_processor.py deleted file mode 100644 index a73811bb..00000000 --- a/src/plugins/group_nickname/nickname_processor.py +++ /dev/null @@ -1,346 +0,0 @@ -import asyncio -import traceback -import threading -import queue -from typing import Dict, Optional -from pymongo.collection import Collection -from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError -from src.common.logger_manager import get_logger -from src.common.database import db # 使用全局 db -from .nickname_mapper import analyze_chat_for_nicknames -from src.config.config import global_config -from ..person_info.person_info import person_info_manager - -logger = get_logger("nickname_processor") - -_stop_event = threading.Event() - - -def _upsert_person(collection: Collection, person_id: str, user_id_int: int, platform: str): - """ - 确保数据库中存在指定 person_id 的文档 (Upsert)。 - 如果文档不存在,则使用提供的用户信息创建它。 - - Args: - collection: MongoDB 集合对象 (person_info)。 - person_id: 要查找或创建的 person_id。 - user_id_int: 用户的整数 ID。 - platform: 平台名称。 - - Returns: - UpdateResult: MongoDB 更新操作的结果。 - - Raises: - DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)。 - Exception: 其他数据库操作错误。 - """ - try: - # 关键步骤:基于 person_id 执行 Upsert - # 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。 - # 如果文档已存在,此操作不会修改任何内容。 - result = collection.update_one( - {"person_id": person_id}, - { - "$setOnInsert": { - "person_id": person_id, - "user_id": user_id_int, # 确保这里使用传入的 user_id_int - "platform": platform, - "group_nicknames": [], # 初始化 group_nicknames 数组 - } - }, - upsert=True, - ) - if result.upserted_id: - logger.debug(f"Upsert on person_id created new document: {person_id}") - # else: - # logger.debug(f"Upsert on person_id found existing document: {person_id}") - return result - except DuplicateKeyError as dk_err: - # 这个错误理论上不应该再由 upsert 触发。 - # 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。 - logger.error( - f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" - ) - raise # 将异常向上抛出,让调用者处理 - except Exception as e: - logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}") - raise # 将异常向上抛出 - - -def _update_group_nickname(collection: Collection, person_id: str, group_id_str: str, nickname: str): - """ - 尝试更新 person_id 文档中特定群组的绰号计数,或添加新条目。 - 按顺序尝试:增加计数 -> 添加绰号 -> 添加群组。 - - Args: - collection: MongoDB 集合对象 (person_info)。 - person_id: 目标文档的 person_id。 - group_id_str: 目标群组的 ID (字符串)。 - nickname: 要更新或添加的绰号。 - """ - # 3a. 尝试增加现有群组中现有绰号的计数 - result_inc = collection.update_one( - { - "person_id": person_id, - "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}, - }, - {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, - array_filters=[ - {"group.group_id": group_id_str}, - {"nick.name": nickname}, - ], - ) - if result_inc.modified_count > 0: - # logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。") - return # 成功增加计数,操作完成 - - # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 - result_push_nick = collection.update_one( - { - "person_id": person_id, - "group_nicknames.group_id": group_id_str, # 检查群组是否存在 - }, - {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, - array_filters=[{"group.group_id": group_id_str}], - ) - if result_push_nick.modified_count > 0: - logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'。") - return # 成功添加绰号,操作完成 - - # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 - # 确保 group_nicknames 数组存在 (作为保险措施) - collection.update_one( - {"person_id": person_id, "group_nicknames": {"$exists": False}}, - {"$set": {"group_nicknames": []}}, - ) - # 推送新的群组对象到 group_nicknames 数组 - result_push_group = collection.update_one( - { - "person_id": person_id, - "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 - }, - { - "$push": { - "group_nicknames": { - "group_id": group_id_str, - "nicknames": [{"name": nickname, "count": 1}], - } - } - }, - ) - if result_push_group.modified_count > 0: - logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'。") - # else: - # 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能), - # 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。 - # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") - - -async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]): - """ - 更新数据库中用户的群组绰号计数 (使用全局 db)。 - 通过调用辅助函数来处理 person 文档的 upsert 和绰号更新。 - - Args: - platform (str): 平台名称 (e.g., 'qq')。 - group_id (str): 群组 ID。 - nickname_map (Dict[str, str]): 用户 ID (字符串) 到绰号的映射。 - """ - person_info_collection = db.person_info - if not nickname_map: - logger.debug("提供的用于更新的绰号映射为空。") - return - - logger.info(f"尝试更新平台 '{platform}' 群组 '{group_id}' 的绰号计数,映射为: {nickname_map}") - - for user_id_str, nickname in nickname_map.items(): - # --- 基本验证 --- - if not user_id_str or not nickname: - logger.warning(f"跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'") - continue - group_id_str = str(group_id) - - # 使用 isdigit() 检查 user_id_str 是否为纯数字字符串 - if not user_id_str.isdigit(): - # isdigit() 会对空字符串返回 False,并且不识别负号、小数点等 - logger.warning(f"无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。") - continue - - user_id_int = int(user_id_str) - - # --- 结束验证 --- - - try: - # --- 步骤 1: 生成 person_id --- - person_id = person_info_manager.get_person_id(platform, user_id_str) - if not person_id: - logger.error(f"无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。") - continue - - # --- 步骤 2: 确保 Person 文档存在 (调用辅助函数) --- - _upsert_person(person_info_collection, person_id, user_id_int, platform) - - # --- 步骤 3: 更新群组绰号 (调用辅助函数) --- - _update_group_nickname(person_info_collection, person_id, group_id_str, nickname) - - # --- 统一处理数据库操作可能抛出的异常 --- - except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误 - logger.exception( - f"数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}" - ) - except Exception as e: - # 捕获其他所有可能的错误 (例如 person_id 生成、辅助函数内部未捕获的错误等) - logger.exception(f"处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") - - -# --- 使用 queue.Queue --- -queue_max_size = getattr(global_config, "NICKNAME_QUEUE_MAX_SIZE", 100) -nickname_queue: queue.Queue = queue.Queue(maxsize=queue_max_size) - -_nickname_thread: Optional[threading.Thread] = None - - -# --- add_to_nickname_queue (保持不变,已包含 platform) --- -async def add_to_nickname_queue( - chat_history_str: str, bot_reply: str, platform: str, group_id: Optional[str], user_name_map: Dict[str, str] -): - """将需要分析的数据放入队列。""" - if not global_config or not global_config.ENABLE_NICKNAME_MAPPING: - return - if group_id is None: - logger.debug("私聊跳过绰号映射。") - return - try: - item = (chat_history_str, bot_reply, platform, str(group_id), user_name_map) - nickname_queue.put_nowait(item) - logger.debug( - f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {nickname_queue.qsize()}" - ) - except queue.Full: - logger.warning(f"无法将项目添加到绰号队列:队列已满 (maxsize={nickname_queue.maxsize})。") - except Exception as e: - logger.warning(f"无法将项目添加到绰号队列: {e}", exc_info=True) - - -# --- _nickname_processing_loop (保持不变,已包含 platform) --- -async def _nickname_processing_loop(q: queue.Queue, stop_event: threading.Event): - """独立线程中的主循环,处理队列任务 (使用全局 db 和 config)。""" - thread_id = threading.get_ident() - logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。") - sleep_interval = getattr(global_config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) - - while not stop_event.is_set(): - try: - item = q.get(block=True, timeout=sleep_interval) - - if isinstance(item, tuple) and len(item) == 5: - chat_history_str, bot_reply, platform, group_id, user_name_map = item - logger.debug(f"(线程 ID: {thread_id}) 正在处理平台 '{platform}' 群组 '{group_id}' 的绰号映射任务...") - analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map) - if analysis_result.get("is_exist") and analysis_result.get("data"): - await update_nickname_counts(platform, group_id, analysis_result["data"]) - else: - logger.warning(f"(线程 ID: {thread_id}) 从队列接收到意外的项目类型或长度: {type(item)}, 内容: {item}") - - q.task_done() - - except queue.Empty: - continue - except asyncio.CancelledError: - logger.info(f"绰号处理循环已取消 (线程 ID: {thread_id})。") - break - except Exception as e: - logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}") - await asyncio.sleep(5) - - logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。") - - -# --- _run_processor_thread (保持不变) --- -def _run_processor_thread(q: queue.Queue, stop_event: threading.Event): - """线程启动函数,运行异步循环。""" - loop = None - thread_id = threading.get_ident() - logger.info(f"Nickname processor thread starting (Thread ID: {thread_id})...") - try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - logger.info(f"(Thread ID: {thread_id}) Asyncio event loop created and set.") - loop.run_until_complete(_nickname_processing_loop(q, stop_event)) - except Exception as e: - logger.error(f"(Thread ID: {thread_id}) Error running nickname processor thread: {e}", exc_info=True) - finally: - if loop: - try: - if loop.is_running(): - logger.info(f"(Thread ID: {thread_id}) Stopping the asyncio loop...") - all_tasks = asyncio.all_tasks(loop) - if all_tasks: - logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} running tasks...") - for task in all_tasks: - task.cancel() - loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True)) - logger.info(f"(Thread ID: {thread_id}) All tasks cancelled.") - loop.stop() - logger.info(f"(Thread ID: {thread_id}) Loop stopped.") - if not loop.is_closed(): - loop.close() - logger.info(f"(Thread ID: {thread_id}) Asyncio loop closed.") - except Exception as loop_close_err: - logger.error(f"(Thread ID: {thread_id}) Error closing loop: {loop_close_err}", exc_info=True) - logger.info(f"Nickname processor thread finished (Thread ID: {thread_id}).") - - -# --- start_nickname_processor (保持不变) --- -def start_nickname_processor(): - """启动绰号映射处理线程。""" - global _nickname_thread - if not global_config or not global_config.ENABLE_NICKNAME_MAPPING: - logger.info("绰号映射功能已禁用或无法获取配置。处理器未启动。") - return - - if _nickname_thread is None or not _nickname_thread.is_alive(): - logger.info("正在启动绰号处理器线程...") - stop_event = get_stop_event() - stop_event.clear() - _nickname_thread = threading.Thread( - target=_run_processor_thread, args=(nickname_queue, stop_event), daemon=True - ) - _nickname_thread.start() - logger.info(f"绰号处理器线程已启动 (Thread ID: {_nickname_thread.ident})") - else: - logger.warning("绰号处理器线程已在运行中。") - - -# --- stop_nickname_processor (保持不变) --- -def stop_nickname_processor(): - """停止绰号映射处理线程。""" - global _nickname_thread - if _nickname_thread and _nickname_thread.is_alive(): - logger.info("正在停止绰号处理器线程...") - set_stop_event() - try: - _nickname_thread.join(timeout=10) - if _nickname_thread.is_alive(): - logger.warning("绰号处理器线程在 10 秒后未结束。") - except Exception as e: - logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True) - finally: - if _nickname_thread and not _nickname_thread.is_alive(): - logger.info("绰号处理器线程已成功停止。") - else: - logger.warning("停止绰号处理器线程:线程可能仍在运行或未正确清理。") - _nickname_thread = None - else: - logger.info("绰号处理器线程未在运行或已被清理。") - - -# --- Event 控制函数 (保持不变) --- -def get_stop_event() -> threading.Event: - """获取全局停止事件""" - return _stop_event - - -def set_stop_event(): - """设置全局停止事件,通知子线程退出""" - _stop_event.set() diff --git a/src/plugins/group_nickname/nickname_utils.py b/src/plugins/group_nickname/nickname_utils.py index 83931417..f0402b12 100644 --- a/src/plugins/group_nickname/nickname_utils.py +++ b/src/plugins/group_nickname/nickname_utils.py @@ -1,26 +1,21 @@ import random -import time from typing import List, Dict, Tuple, Optional from src.common.logger_manager import get_logger from src.config.config import global_config -from src.plugins.person_info.relationship_manager import relationship_manager -from src.plugins.chat.chat_stream import ChatStream -from src.plugins.chat.message import MessageRecv -from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat -from .nickname_processor import add_to_nickname_queue +# 这个文件现在只包含纯粹的工具函数,与状态和流程无关 -# 获取日志记录器,命名为 "绰号工具" logger = get_logger("nickname_utils") def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]: """ - 从给定的绰号信息中,根据映射次数加权随机选择最多 N 个绰号。 + 从给定的绰号信息中,根据映射次数加权随机选择最多 N 个绰号用于 Prompt。 Args: all_nicknames_info: 包含用户及其绰号信息的字典,格式为 { "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... } + 注意:这里的用户名是 person_name。 Returns: List[Tuple[str, str, int]]: 选中的绰号列表,每个元素为 (用户名, 绰号, 次数)。 @@ -29,17 +24,21 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int if not all_nicknames_info: return [] - candidates = [] + candidates = [] # 存储 (用户名, 绰号, 次数, 权重) + smoothing_factor = getattr(global_config, "NICKNAME_PROBABILITY_SMOOTHING", 1.0) # 平滑因子,避免权重为0 + for user_name, nicknames in all_nicknames_info.items(): - if nicknames: + if nicknames and isinstance(nicknames, list): for nickname_entry in nicknames: + # 确保条目是字典且只有一个键值对 if isinstance(nickname_entry, dict) and len(nickname_entry) == 1: nickname, count = list(nickname_entry.items())[0] - if isinstance(count, int) and count > 0: - weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING + # 确保次数是正整数 + if isinstance(count, int) and count > 0 and isinstance(nickname, str) and nickname: + weight = count + smoothing_factor # 计算权重 candidates.append((user_name, nickname, count, weight)) else: - logger.warning(f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。") + logger.warning(f"用户 '{user_name}' 的绰号条目无效: {nickname_entry} (次数非正整数或绰号为空)。已跳过。") else: logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。") @@ -47,23 +46,24 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int return [] # 确定需要选择的数量 - num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates)) + max_nicknames = getattr(global_config, "MAX_NICKNAMES_IN_PROMPT", 5) + num_to_select = min(max_nicknames, len(candidates)) try: - # 调用新的辅助函数进行不重复加权抽样 + # 调用加权随机抽样(不重复) selected_candidates_with_weight = weighted_sample_without_replacement(candidates, num_to_select) # 如果抽样结果数量不足(例如权重问题导致提前退出),可以考虑是否需要补充 if len(selected_candidates_with_weight) < num_to_select: logger.debug( - f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),补充选择次数最多的。" + f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),尝试补充选择次数最多的。" ) # 筛选出未被选中的候选 selected_ids = set( (c[0], c[1]) for c in selected_candidates_with_weight - ) # 使用 (用户名, 绰号) 作为唯一标识 + ) # 使用 (用户名, 绰号) 作为唯一标识 remaining_candidates = [c for c in candidates if (c[0], c[1]) not in selected_ids] - remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 needed = num_to_select - len(selected_candidates_with_weight) selected_candidates_with_weight.extend(remaining_candidates[:needed]) @@ -71,14 +71,14 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int # 日志:记录加权随机选择时发生的错误,并回退到简单选择 logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True) # 出错时回退到选择次数最多的 N 个 - candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 - # 注意:这里需要选择包含权重的元组,或者调整后续处理 + candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 selected_candidates_with_weight = candidates[:num_to_select] # 格式化输出结果为 (用户名, 绰号, 次数),移除权重 result = [(user, nick, count) for user, nick, count, _weight in selected_candidates_with_weight] - result.sort(key=lambda x: x[2], reverse=True) # 按次数降序 + # 按次数降序排序最终结果 + result.sort(key=lambda x: x[2], reverse=True) logger.debug(f"为 Prompt 选择的绰号: {result}") return result @@ -95,13 +95,13 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in str: 格式化后的字符串,如果列表为空则返回空字符串。 """ if not selected_nicknames: - # 如果没有选中的绰号,返回空字符串 return "" + # Prompt 注入部分的标题 prompt_lines = [ - "以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:" - ] # 注入部分的标题 - grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组 + "以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),供你参考:" + ] + grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组 # 按用户分组绰号 for user_name, nickname, _count in selected_nicknames: @@ -112,8 +112,9 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in # 构建每个用户的绰号字符串 for user_name, nicknames in grouped_by_user.items(): - nicknames_str = "、".join(nicknames) # 使用中文顿号连接 - prompt_lines.append(f"- {user_name},ta被群友称为:{nicknames_str}") # 格式化输出 + nicknames_str = "、".join(nicknames) # 使用中文顿号连接 + # 格式化输出,例如: "- 张三,ta 可能被称为:“三儿”、“张哥”" + prompt_lines.append(f"- {user_name},ta 可能被称为:{nicknames_str}") # 如果只有标题行,返回空字符串,避免注入无意义的标题 if len(prompt_lines) > 1: @@ -123,213 +124,51 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in return "" -async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str: - """ - 获取并格式化用于 Prompt 注入的绰号信息字符串。 - 这是一个封装函数,整合了获取、选择和格式化的逻辑。 - - Args: - chat_stream: 当前的 ChatStream 对象。 - message_list_before_now: 用于确定上下文中用户的消息列表。 - - Returns: - str: 格式化后的绰号信息字符串,如果无法获取或格式化则返回空字符串。 - """ - nickname_injection_str = "" - # 仅在群聊且功能开启时执行 - if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info: - try: - group_id = str(chat_stream.group_info.group_id) - user_ids_in_context = set() # 存储上下文中出现的用户ID - - # 从消息列表中提取用户ID - if message_list_before_now: - for msg in message_list_before_now: - sender_id = msg["user_info"].get("user_id") - if sender_id: - user_ids_in_context.add(str(sender_id)) - else: - # 如果消息列表为空,尝试获取最近发言者作为上下文用户 - recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者 - for speaker in recent_speakers: - user_ids_in_context.add(str(speaker["user_id"])) - if not user_ids_in_context: - # 日志:记录未找到上下文用户 - logger.warning(f"[{chat_stream.stream_id}] 未找到消息或最近发言者用于绰号注入。") - - # 如果找到了上下文用户 - if user_ids_in_context: - platform = chat_stream.platform - # --- 调用批量获取群组绰号的方法 --- - # 使用 relationship_manager 从数据库获取数据 - all_nicknames_data = await relationship_manager.get_users_group_nicknames( - platform, list(user_ids_in_context), group_id - ) - - # 如果获取到了绰号数据 - if all_nicknames_data: - # 调用选择和格式化函数 - selected_nicknames = select_nicknames_for_prompt(all_nicknames_data) - nickname_injection_str = format_nickname_prompt_injection(selected_nicknames) - if nickname_injection_str: - # 日志:记录生成的用于 Prompt 的绰号信息 - logger.debug( - f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}" - ) - - except Exception as e: - # 日志:记录获取或格式化绰号信息时发生的错误 - logger.error(f"[{chat_stream.stream_id}] 获取或格式化 Prompt 绰号信息时出错: {e}", exc_info=True) - nickname_injection_str = "" # 出错时确保返回空字符串 - - # 返回最终生成的字符串(可能为空) - return nickname_injection_str - - -async def trigger_nickname_analysis_if_needed( - anchor_message: MessageRecv, - bot_reply: List[str], - chat_stream: Optional[ChatStream] = None, # 允许传入 chat_stream 或从 anchor_message 获取 -): - """ - 如果满足条件(群聊、功能开启),则准备数据并触发绰号分析任务。 - 将相关信息放入处理队列,由 nickname_processor 处理。 - - Args: - anchor_message: 触发回复的原始消息对象。 - bot_reply: Bot 生成的回复内容列表。 - chat_stream: 可选的 ChatStream 对象。 - """ - # 检查功能是否开启 - if not global_config.ENABLE_NICKNAME_MAPPING: - return # 如果功能禁用,直接返回 - - # 确定使用的 chat_stream - current_chat_stream = chat_stream or anchor_message.chat_stream - - # 检查是否是群聊且 chat_stream 有效 - if not current_chat_stream or not current_chat_stream.group_info: - # 日志:记录跳过分析的原因(非群聊或无效流) - logger.debug( - f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。" - ) - return - - log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀 - - try: - # 1. 获取历史记录 - history_limit = 30 # 定义获取历史记录的数量限制 - history_messages = get_raw_msg_before_timestamp_with_chat( - chat_id=current_chat_stream.stream_id, - timestamp=time.time(), # 获取当前时间之前的记录 - limit=history_limit, - ) - - # 格式化历史记录为可读字符串 - chat_history_str = await build_readable_messages( - messages=history_messages, - replace_bot_name=True, # 替换机器人名称,以便 LLM 分析 - merge_messages=False, # 不合并消息,保留原始对话结构 - timestamp_mode="relative", # 使用相对时间戳 - read_mark=0.0, # 不需要已读标记 - truncate=False, # 获取完整内容进行分析 - ) - - # 2. 获取 Bot 回复字符串 - bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表 - - # 3. 获取群号和平台信息 - group_id = str(current_chat_stream.group_info.group_id) - platform = current_chat_stream.platform - - # 4. 构建用户 ID 到名称的映射 (user_name_map) - user_ids_in_history = set() # 存储历史记录中出现的用户ID - for msg in history_messages: - sender_id = msg["user_info"].get("user_id") - if sender_id: - user_ids_in_history.add(str(sender_id)) - - user_name_map = {} # 初始化映射字典 - if user_ids_in_history: - try: - # 批量从数据库获取这些用户的 person_name - names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history)) - except Exception as e: - # 日志:记录获取 person_name 时发生的错误 - logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True) - names_data = {} # 出错时使用空字典 - - # 填充 user_name_map - for user_id in user_ids_in_history: - if user_id in names_data: - # 如果数据库中有 person_name,则使用它 - user_name_map[user_id] = names_data[user_id] - else: - # 如果数据库中没有,则回退查找用户在历史记录中最近使用的 nickname - latest_nickname = next( - ( - m["user_info"].get("user_nickname") # 从 user_info 获取 nickname - for m in reversed(history_messages) # 从后往前找 - # 确保消息的用户ID匹配且 nickname 存在 - if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") - ), - None, # 如果找不到,返回 None - ) - # 如果找到了 nickname 则使用,否则使用 "未知(ID)" - user_name_map[user_id] = latest_nickname or f"未知({user_id})" - - # 5. 将准备好的数据添加到绰号处理队列 - await add_to_nickname_queue(chat_history_str, bot_reply_str, platform, group_id, user_name_map) - # 日志:记录已成功触发分析任务 - logger.debug(f"{log_prefix} 已为群组 {group_id} 触发绰号分析任务。") - - except Exception as e: - # 日志:记录触发分析过程中发生的任何其他错误 - logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) - - def weighted_sample_without_replacement( candidates: List[Tuple[str, str, int, float]], k: int ) -> List[Tuple[str, str, int, float]]: """ - 执行不重复的加权随机抽样。 + 执行不重复的加权随机抽样。使用 A-ExpJ 算法思想的简化实现。 Args: candidates: 候选列表,每个元素为 (用户名, 绰号, 次数, 权重)。 k: 需要选择的数量。 Returns: - List[Tuple[str, str, int, float]]: 选中的元素列表。 + List[Tuple[str, str, int, float]]: 选中的元素列表(包含权重)。 """ if k <= 0: return [] - if k >= len(candidates): - # 如果需要选择的数量大于或等于候选数量,直接返回所有候选 - return candidates[:] # 返回副本以避免修改原始列表 + n = len(candidates) + if k >= n: + return candidates[:] # 返回副本 - pool = candidates[:] # 创建候选列表的副本进行操作 - selected = [] - # 注意:原评论代码中计算 total_weight 但未使用,这里也省略。 - # random.choices 内部会处理权重的归一化。 - - for _ in range(min(k, len(pool))): # 确保迭代次数不超过池中剩余元素 - if not pool: # 如果池已空,提前结束 - break - - weights = [c[3] for c in pool] # 获取当前池中所有元素的权重 - # 检查权重是否有效 - if sum(weights) <= 0: - # 如果所有剩余权重无效,随机选择一个(或根据需要采取其他策略) - logger.warning("加权抽样池中剩余权重总和为0或负数,随机选择一个。") - chosen_index = random.randrange(len(pool)) - chosen = pool.pop(chosen_index) + # 计算每个元素的 key = U^(1/weight),其中 U 是 (0, 1) 之间的随机数 + # 为了数值稳定性,计算 log(key) = log(U) / weight + # log(U) 可以用 -Exponential(1) 来生成 + weighted_keys = [] + for i in range(n): + weight = candidates[i][3] + if weight <= 0: + # 处理权重为0或负数的情况,赋予一个极小的概率(或极大负数的log_key) + log_key = float('-inf') # 或者一个非常大的负数 + logger.warning(f"候选者 {candidates[i][:2]} 的权重为非正数 ({weight}),抽中概率极低。") else: - # 使用 random.choices 进行加权抽样,选择 1 个 - # random.choices 返回一个列表,所以取第一个元素 [0] - chosen = random.choices(pool, weights=weights, k=1)[0] - pool.remove(chosen) # 从池中移除选中的元素,实现不重复抽样 + log_u = -random.expovariate(1.0) # 生成 -Exponential(1) 随机数 + log_key = log_u / weight + weighted_keys.append((log_key, i)) # 存储 (log_key, 原始索引) - selected.append(chosen) + # 按 log_key 降序排序 (相当于按 key 升序排序) + weighted_keys.sort(key=lambda x: x[0], reverse=True) - return selected + # 选择 log_key 最大的 k 个元素的原始索引 + selected_indices = [index for _log_key, index in weighted_keys[:k]] + + # 根据选中的索引从原始 candidates 列表中获取元素 + selected_items = [candidates[i] for i in selected_indices] + + return selected_items + +# 移除旧的流程函数 +# get_nickname_injection_for_prompt 和 trigger_nickname_analysis_if_needed +# 的逻辑现在由 NicknameManager 处理 diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 0e690cc2..12b720bf 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -28,9 +28,8 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.moods.moods import MoodManager from src.heart_flow.utils_chat import get_chat_type_and_target_info from rich.traceback import install -from src.plugins.group_nickname.nickname_utils import trigger_nickname_analysis_if_needed from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat -from src.plugins.group_nickname.nickname_utils import get_nickname_injection_for_prompt +from src.plugins.group_nickname.nickname_manager import nickname_manager install(extra_lines=3) @@ -605,7 +604,7 @@ class HeartFChatting: ) # 调用工具函数触发绰号分析 - await trigger_nickname_analysis_if_needed(anchor_message, reply, self.chat_stream) + await nickname_manager.trigger_nickname_analysis(anchor_message, reply, self.chat_stream) return True, thinking_id @@ -874,7 +873,7 @@ class HeartFChatting: limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit ) # 调用工具函数获取格式化后的绰号字符串 - nickname_injection_str = await get_nickname_injection_for_prompt(self.chat_stream, message_list_before_now) + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(self.chat_stream, message_list_before_now) # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- prompt = await prompt_builder.build_planner_prompt( diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index dc9e43ee..b979268c 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -14,7 +14,7 @@ from ..moods.moods import MoodManager from ..memory_system.Hippocampus import HippocampusManager from ..schedule.schedule_generator import bot_schedule from ..knowledge.knowledge_lib import qa_manager -from src.plugins.group_nickname.nickname_utils import get_nickname_injection_for_prompt +from src.plugins.group_nickname.nickname_manager import nickname_manager import traceback from .heartFC_Cycleinfo import CycleInfo @@ -255,7 +255,7 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") # 调用新的工具函数获取绰号信息 - nickname_injection_str = await get_nickname_injection_for_prompt(chat_stream, message_list_before_now) + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(chat_stream, message_list_before_now) prompt = await global_prompt_manager.format_prompt( template_name, @@ -451,7 +451,7 @@ class PromptBuilder: chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") # 调用新的工具函数获取绰号信息 - nickname_injection_str = await get_nickname_injection_for_prompt(chat_stream, message_list_before_now) + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(chat_stream, message_list_before_now) prompt = await global_prompt_manager.format_prompt( template_name, diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index ca266413..6521ae13 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -20,7 +20,7 @@ from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.utils.timer_calculator import Timer from src.heart_flow.utils_chat import get_chat_type_and_target_info -from src.plugins.group_nickname.nickname_utils import trigger_nickname_analysis_if_needed +from src.plugins.group_nickname.nickname_manager import nickname_manager logger = get_logger("chat") @@ -317,7 +317,7 @@ class NormalChat: # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) if first_bot_msg: info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg) - await trigger_nickname_analysis_if_needed(message, response_set, self.chat_stream) + await nickname_manager.trigger_nickname_analysis(message, response_set, self.chat_stream) else: logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher") diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 81cead61..6723f2de 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -128,7 +128,7 @@ enable_nickname_mapping = false # 绰号映射功能总开关(默认关闭, max_nicknames_in_prompt = 10 # Prompt 中最多注入的绰号数量(防止token数量爆炸) nickname_probability_smoothing = 1 # 绰号加权随机选择的平滑因子 nickname_queue_max_size = 100 # 绰号处理队列最大容量 -nickname_process_sleep_interval = 5 # 绰号处理进程休眠间隔(秒) +nickname_process_sleep_interval = 60 # 绰号处理进程休眠间隔(秒) [memory] build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多 diff --git a/template/lpmm_config_template.toml b/template/lpmm_config_template.toml index aae664d5..491e1feb 100644 --- a/template/lpmm_config_template.toml +++ b/template/lpmm_config_template.toml @@ -54,7 +54,7 @@ res_top_k = 3 # 最终提供的文段TopK [persistence] # 持久化配置(存储中间数据,防止重复计算) data_root_path = "data" # 数据根目录 -raw_data_path = "data/imported_lpmm_data" # 原始数据路径 -openie_data_path = "data/openie" # OpenIE数据路径 +raw_data_path = "data/import.json" # 原始数据路径 +openie_data_path = "data/openie.json" # OpenIE数据路径 embedding_data_dir = "data/embedding" # 嵌入数据目录 rag_data_dir = "data/rag" # RAG数据目录