diff --git a/bot.py b/bot.py index 41847a01..4197556e 100644 --- a/bot.py +++ b/bot.py @@ -14,6 +14,8 @@ from src.common.logger_manager import get_logger from src.common.crash_logger import install_crash_handler from src.main import MainSystem from rich.traceback import install +from src.plugins.group_nickname.nickname_manager import nickname_manager +import atexit install(extra_lines=3) @@ -221,6 +223,19 @@ def raw_main(): env_config = {key: os.getenv(key) for key in os.environ} scan_provider(env_config) + # 确保 NicknameManager 单例实例存在并已初始化 + # (单例模式下,导入时或第一次调用时会自动初始化) + _ = nickname_manager # 显式引用一次 + + # 启动 NicknameManager 的后台处理器线程 + logger.info("准备启动绰号处理管理器...") + nickname_manager.start_processor() # 调用实例的方法 + logger.info("已调用启动绰号处理管理器。") + + # 注册 NicknameManager 的停止方法到 atexit,确保程序退出时线程能被清理 + atexit.register(nickname_manager.stop_processor) # 注册实例的方法 + logger.info("已注册绰号处理管理器的退出处理程序。") + # 返回MainSystem实例 return MainSystem() diff --git a/src/config/config.py b/src/config/config.py index 28d947ef..312f3e95 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -1,7 +1,7 @@ import os import re from dataclasses import dataclass, field -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any from dateutil import tz import tomli @@ -274,6 +274,13 @@ class BotConfig: talk_allowed_private = set() enable_pfc_chatting: bool = False # 是否启用PFC聊天 + # Group Nickname + ENABLE_NICKNAME_MAPPING: bool = False # 绰号映射功能总开关 + MAX_NICKNAMES_IN_PROMPT: int = 10 # Prompt 中最多注入的绰号数量 + NICKNAME_PROBABILITY_SMOOTHING: int = 1 # 绰号加权随机选择的平滑因子 + NICKNAME_QUEUE_MAX_SIZE: int = 100 # 绰号处理队列最大容量 + NICKNAME_PROCESS_SLEEP_INTERVAL: float = 60 # 绰号处理进程休眠间隔(秒) + # 模型配置 llm_reasoning: dict[str, str] = field(default_factory=lambda: {}) # llm_reasoning_minor: dict[str, str] = field(default_factory=lambda: {}) @@ -289,6 +296,7 @@ class BotConfig: llm_heartflow: Dict[str, str] = field(default_factory=lambda: {}) llm_tool_use: Dict[str, str] = field(default_factory=lambda: {}) llm_plan: Dict[str, str] = field(default_factory=lambda: {}) + llm_nickname_mapping: Dict[str, Any] = field(default_factory=dict) api_urls: Dict[str, str] = field(default_factory=lambda: {}) @@ -402,6 +410,25 @@ class BotConfig: config.save_emoji = emoji_config.get("save_emoji", config.save_emoji) config.steal_emoji = emoji_config.get("steal_emoji", config.steal_emoji) + def group_nickname(parent: dict): + if config.INNER_VERSION in SpecifierSet(">=1.6.2"): + gn_config = parent.get("group_nickname", {}) + config.ENABLE_NICKNAME_MAPPING = gn_config.get( + "enable_nickname_mapping", config.ENABLE_NICKNAME_MAPPING + ) + config.MAX_NICKNAMES_IN_PROMPT = gn_config.get( + "max_nicknames_in_prompt", config.MAX_NICKNAMES_IN_PROMPT + ) + config.NICKNAME_PROBABILITY_SMOOTHING = gn_config.get( + "nickname_probability_smoothing", config.NICKNAME_PROBABILITY_SMOOTHING + ) + config.NICKNAME_QUEUE_MAX_SIZE = gn_config.get( + "nickname_queue_max_size", config.NICKNAME_QUEUE_MAX_SIZE + ) + config.NICKNAME_PROCESS_SLEEP_INTERVAL = gn_config.get( + "nickname_process_sleep_interval", config.NICKNAME_PROCESS_SLEEP_INTERVAL + ) + def bot(parent: dict): # 机器人基础配置 bot_config = parent["bot"] @@ -487,6 +514,7 @@ class BotConfig: "llm_PFC_action_planner", "llm_PFC_chat", "llm_PFC_reply_checker", + "llm_nickname_mapping", ] for item in config_list: @@ -692,6 +720,7 @@ class BotConfig: "chat": {"func": chat, "support": ">=1.6.0", "necessary": False}, "normal_chat": {"func": normal_chat, "support": ">=1.6.0", "necessary": False}, "focus_chat": {"func": focus_chat, "support": ">=1.6.0", "necessary": False}, + "group_nickname": {"func": group_nickname, "support": ">=0.6.3", "necessary": False}, } # 原地修改,将 字符串版本表达式 转换成 版本对象 diff --git a/src/plugins/group_nickname/nickname_db.py b/src/plugins/group_nickname/nickname_db.py new file mode 100644 index 00000000..ac3bd24c --- /dev/null +++ b/src/plugins/group_nickname/nickname_db.py @@ -0,0 +1,160 @@ +from pymongo.collection import Collection +from pymongo.errors import OperationFailure, DuplicateKeyError +from src.common.logger_manager import get_logger +from typing import Optional + +logger = get_logger("nickname_db") + + +class NicknameDB: + """ + 处理与群组绰号相关的数据库操作 (MongoDB)。 + 封装了对 'person_info' 集合的读写操作。 + """ + + def __init__(self, person_info_collection: Optional[Collection]): + """ + 初始化 NicknameDB 处理器。 + + Args: + person_info_collection: MongoDB 'person_info' 集合对象。 + 如果为 None,则数据库操作将被禁用。 + """ + if person_info_collection is None: + logger.error("未提供 person_info 集合,NicknameDB 操作将被禁用。") + self.person_info_collection = None + else: + self.person_info_collection = person_info_collection + logger.info("NicknameDB 初始化成功。") + + def is_available(self) -> bool: + """检查数据库集合是否可用。""" + return self.person_info_collection is not None + + def upsert_person(self, person_id: str, user_id_int: int, platform: str): + """ + 确保数据库中存在指定 person_id 的文档 (Upsert)。 + 如果文档不存在,则使用提供的用户信息创建它。 + + Args: + person_id: 要查找或创建的 person_id。 + user_id_int: 用户的整数 ID。 + platform: 平台名称。 + + Returns: + UpdateResult 或 None: MongoDB 更新操作的结果,如果数据库不可用则返回 None。 + + Raises: + DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)。 + Exception: 其他数据库操作错误。 + """ + if not self.is_available(): + logger.error("数据库集合不可用,无法执行 upsert_person。") + return None + try: + # 关键步骤:基于 person_id 执行 Upsert + result = self.person_info_collection.update_one( + {"person_id": person_id}, + { + "$setOnInsert": { + "person_id": person_id, + "user_id": user_id_int, + "platform": platform, + "group_nicknames": [], # 初始化 group_nicknames 数组 + } + }, + upsert=True, + ) + if result.upserted_id: + logger.debug(f"Upsert 创建了新的 person 文档: {person_id}") + return result + except DuplicateKeyError as dk_err: + # 这个错误理论上不应该再由 upsert 触发。 + logger.error( + f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" + ) + raise # 将异常向上抛出 + except Exception as e: + logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}") + raise # 将异常向上抛出 + + def update_group_nickname_count(self, person_id: str, group_id_str: str, nickname: str): + """ + 尝试更新 person_id 文档中特定群组的绰号计数,或添加新条目。 + 按顺序尝试:增加计数 -> 添加绰号 -> 添加群组。 + + Args: + person_id: 目标文档的 person_id。 + group_id_str: 目标群组的 ID (字符串)。 + nickname: 要更新或添加的绰号。 + """ + if not self.is_available(): + logger.error("数据库集合不可用,无法执行 update_group_nickname_count。") + return + + try: + # 3a. 尝试增加现有群组中现有绰号的计数 + result_inc = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}, + }, + {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, + array_filters=[ + {"group.group_id": group_id_str}, + {"nick.name": nickname}, + ], + ) + if result_inc.modified_count > 0: + # logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。") + return # 成功增加计数,操作完成 + + # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 + result_push_nick = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": group_id_str, # 检查群组是否存在 + }, + {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, + array_filters=[{"group.group_id": group_id_str}], + ) + if result_push_nick.modified_count > 0: + logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'。") + return # 成功添加绰号,操作完成 + + # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 + # 确保 group_nicknames 数组存在 (作为保险措施) + self.person_info_collection.update_one( + {"person_id": person_id, "group_nicknames": {"$exists": False}}, + {"$set": {"group_nicknames": []}}, + ) + # 推送新的群组对象到 group_nicknames 数组 + result_push_group = self.person_info_collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 + }, + { + "$push": { + "group_nicknames": { + "group_id": group_id_str, + "nicknames": [{"name": nickname, "count": 1}], + } + } + }, + ) + if result_push_group.modified_count > 0: + logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'。") + # else: + # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") + + except (OperationFailure, DuplicateKeyError) as db_err: + logger.exception( + f"数据库操作失败 ({type(db_err).__name__}): person_id {person_id}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}" + ) + # 根据需要决定是否向上抛出 raise db_err + except Exception as e: + logger.exception( + f"更新群组绰号计数时发生意外错误: person_id {person_id}, group {group_id_str}, nick {nickname}. Error: {e}" + ) + # 根据需要决定是否向上抛出 raise e diff --git a/src/plugins/group_nickname/nickname_manager.py b/src/plugins/group_nickname/nickname_manager.py new file mode 100644 index 00000000..ac158a66 --- /dev/null +++ b/src/plugins/group_nickname/nickname_manager.py @@ -0,0 +1,528 @@ +import asyncio +import threading +import time +import json +import re +from typing import Dict, Optional, List, Any +from pymongo.errors import OperationFailure, DuplicateKeyError +from src.common.logger_manager import get_logger +from src.common.database import db +from src.config.config import global_config +from src.plugins.models.utils_model import LLMRequest +from .nickname_db import NicknameDB +from .nickname_mapper import _build_mapping_prompt +from .nickname_utils import select_nicknames_for_prompt, format_nickname_prompt_injection +from ..person_info.person_info import person_info_manager +from ..person_info.relationship_manager import relationship_manager +from src.plugins.chat.chat_stream import ChatStream +from src.plugins.chat.message import MessageRecv +from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat + +logger = get_logger("NicknameManager") +logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger + + +def run_async_loop(loop: asyncio.AbstractEventLoop, coro): + """ + 运行给定的协程直到完成,并确保循环最终关闭。 + + Args: + loop: 要使用的 asyncio 事件循环。 + coro: 要在循环中运行的主协程。 + """ + try: + logger_helper.debug(f"Running coroutine in loop {id(loop)}...") + result = loop.run_until_complete(coro) + logger_helper.debug(f"Coroutine completed in loop {id(loop)}.") + return result + except asyncio.CancelledError: + logger_helper.info(f"Coroutine in loop {id(loop)} was cancelled.") + # 取消是预期行为,不视为错误 + except Exception as e: + logger_helper.error(f"Error in async loop {id(loop)}: {e}", exc_info=True) + finally: + try: + # 1. 取消所有剩余任务 + all_tasks = asyncio.all_tasks(loop) + current_task = asyncio.current_task(loop) + tasks_to_cancel = [ + task for task in all_tasks if task is not current_task + ] # 避免取消 run_until_complete 本身 + if tasks_to_cancel: + logger_helper.info(f"Cancelling {len(tasks_to_cancel)} outstanding tasks in loop {id(loop)}...") + for task in tasks_to_cancel: + task.cancel() + # 等待取消完成 + loop.run_until_complete(asyncio.gather(*tasks_to_cancel, return_exceptions=True)) + logger_helper.info(f"Outstanding tasks cancelled in loop {id(loop)}.") + + # 2. 停止循环 (如果仍在运行) + if loop.is_running(): + loop.stop() + logger_helper.info(f"Asyncio loop {id(loop)} stopped.") + + # 3. 关闭循环 (如果未关闭) + if not loop.is_closed(): + # 在关闭前再运行一次以处理挂起的关闭回调 + loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器 + loop.close() + logger_helper.info(f"Asyncio loop {id(loop)} closed.") + except Exception as close_err: + logger_helper.error(f"Error during asyncio loop cleanup for loop {id(loop)}: {close_err}", exc_info=True) + + +class NicknameManager: + """ + 管理群组绰号分析、处理、存储和使用的单例类。 + 封装了 LLM 调用、后台处理线程和数据库交互。 + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if not cls._instance: + with cls._lock: + if not cls._instance: + logger.info("正在创建 NicknameManager 单例实例...") + cls._instance = super(NicknameManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """ + 初始化 NicknameManager。 + 使用锁和标志确保实际初始化只执行一次。 + """ + if hasattr(self, "_initialized") and self._initialized: + return + + with self._lock: + if hasattr(self, "_initialized") and self._initialized: + return + + logger.info("正在初始化 NicknameManager 组件...") + self.config = global_config + self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING + + # 数据库处理器 + person_info_collection = getattr(db, "person_info", None) + self.db_handler = NicknameDB(person_info_collection) + if not self.db_handler.is_available(): + logger.error("数据库处理器初始化失败,NicknameManager 功能受限。") + self.is_enabled = False + + # LLM 映射器 + self.llm_mapper: Optional[LLMRequest] = None + if self.is_enabled: + try: + model_config = self.config.llm_nickname_mapping + if model_config and model_config.get("name"): + self.llm_mapper = LLMRequest( + model=model_config, + temperature=model_config.get("temp", 0.5), + max_tokens=model_config.get("max_tokens", 256), + request_type="nickname_mapping", + ) + logger.info("绰号映射 LLM 映射器初始化成功。") + else: + logger.warning("绰号映射 LLM 配置无效或缺失 'name',功能禁用。") + self.is_enabled = False + except KeyError as ke: + logger.error(f"初始化绰号映射 LLM 时缺少配置项: {ke},功能禁用。", exc_info=True) + self.llm_mapper = None + self.is_enabled = False + except Exception as e: + logger.error(f"初始化绰号映射 LLM 映射器失败: {e},功能禁用。", exc_info=True) + self.llm_mapper = None + self.is_enabled = False + + # 队列和线程 + self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100) + # 使用 asyncio.Queue + self.nickname_queue: asyncio.Queue = asyncio.Queue(maxsize=self.queue_max_size) + self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event,因为它是由另一个线程设置的 + self._nickname_thread: Optional[threading.Thread] = None + self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间 + + self._initialized = True + logger.info("NicknameManager 初始化完成。") + + def start_processor(self): + """启动后台处理线程(如果已启用且未运行)。""" + if not self.is_enabled: + logger.info("绰号处理功能已禁用,处理器未启动。") + return + if self._nickname_thread is None or not self._nickname_thread.is_alive(): + logger.info("正在启动绰号处理器线程...") + self._stop_event.clear() + self._nickname_thread = threading.Thread( + target=self._run_processor_in_thread, # 线程目标函数不变 + daemon=True, + ) + self._nickname_thread.start() + logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})") + else: + logger.warning("绰号处理器线程已在运行中。") + + def stop_processor(self): + """停止后台处理线程。""" + if self._nickname_thread and self._nickname_thread.is_alive(): + logger.info("正在停止绰号处理器线程...") + self._stop_event.set() # 设置停止事件,_processing_loop 会检测到 + try: + # 不需要清空 asyncio.Queue,让循环自然结束或被取消 + self._nickname_thread.join(timeout=10) # 等待线程结束 + if self._nickname_thread.is_alive(): + logger.warning("绰号处理器线程在超时后仍未停止。") + except Exception as e: + logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True) + finally: + if self._nickname_thread and not self._nickname_thread.is_alive(): + logger.info("绰号处理器线程已成功停止。") + self._nickname_thread = None + else: + logger.info("绰号处理器线程未在运行或已被清理。") + + async def trigger_nickname_analysis( + self, + anchor_message: MessageRecv, + bot_reply: List[str], + chat_stream: Optional[ChatStream] = None, + ): + """ + 准备数据并将其排队等待绰号分析(如果满足条件)。 + (现在调用异步的 _add_to_queue) + """ + if not self.is_enabled: + return + + current_chat_stream = chat_stream or anchor_message.chat_stream + if not current_chat_stream or not current_chat_stream.group_info: + logger.debug("跳过绰号分析:非群聊或无效的聊天流。") + return + + log_prefix = f"[{current_chat_stream.stream_id}]" + try: + # 1. 获取历史记录 + history_limit = getattr(self.config, "NICKNAME_ANALYSIS_HISTORY_LIMIT", 30) + history_messages = get_raw_msg_before_timestamp_with_chat( + chat_id=current_chat_stream.stream_id, + timestamp=time.time(), + limit=history_limit, + ) + # 格式化历史记录 + chat_history_str = await build_readable_messages( + messages=history_messages, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, + truncate=False, + ) + # 2. 获取 Bot 回复 + bot_reply_str = " ".join(bot_reply) if bot_reply else "" + # 3. 获取群组和平台信息 + group_id = str(current_chat_stream.group_info.group_id) + platform = current_chat_stream.platform + # 4. 构建用户 ID 到名称的映射 (user_name_map) + user_ids_in_history = { + str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id") + } + user_name_map = {} + if user_ids_in_history: + try: + names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history)) + except Exception as e: + logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True) + names_data = {} + for user_id in user_ids_in_history: + if user_id in names_data: + user_name_map[user_id] = names_data[user_id] + else: + latest_nickname = next( + ( + m["user_info"].get("user_nickname") + for m in reversed(history_messages) + if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") + ), + None, + ) + user_name_map[user_id] = latest_nickname or f"未知({user_id})" + + item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map) + await self._add_to_queue(item, platform, group_id) + + except Exception as e: + logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) + + async def get_nickname_prompt_injection(self, chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str: + """ + 获取并格式化用于 Prompt 注入的绰号信息字符串。 + """ + if not self.is_enabled or not chat_stream or not chat_stream.group_info: + return "" + + log_prefix = f"[{chat_stream.stream_id}]" + try: + group_id = str(chat_stream.group_info.group_id) + platform = chat_stream.platform + user_ids_in_context = { + str(msg["user_info"]["user_id"]) + for msg in message_list_before_now + if msg.get("user_info", {}).get("user_id") + } + + if not user_ids_in_context: + recent_speakers = chat_stream.get_recent_speakers(limit=5) + user_ids_in_context.update(str(speaker["user_id"]) for speaker in recent_speakers) + + if not user_ids_in_context: + logger.warning(f"{log_prefix} 未找到上下文用户用于绰号注入。") + return "" + + all_nicknames_data = await relationship_manager.get_users_group_nicknames( + platform, list(user_ids_in_context), group_id + ) + + if all_nicknames_data: + selected_nicknames = select_nicknames_for_prompt(all_nicknames_data) + injection_str = format_nickname_prompt_injection(selected_nicknames) + if injection_str: + logger.debug(f"{log_prefix} 生成的绰号 Prompt 注入:\n{injection_str}") + return injection_str + else: + return "" + + except Exception as e: + logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True) + return "" + + # 私有/内部方法 + + async def _add_to_queue(self, item: tuple, platform: str, group_id: str): + """将项目异步添加到内部处理队列 (asyncio.Queue)。""" + try: + # 使用 await put(),如果队列满则异步等待 + await self.nickname_queue.put(item) + logger.debug( + f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}" + ) + except asyncio.QueueFull: + # 理论上 await put() 不会直接抛 QueueFull,除非 maxsize=0 + # 但保留以防万一或未来修改 + logger.warning( + f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。" + ) + except Exception as e: + logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True) + + async def _analyze_and_update_nicknames(self, item: tuple): + """处理单个队列项目:调用 LLM 分析并更新数据库。""" + if not isinstance(item, tuple) or len(item) != 5: + logger.warning(f"从队列接收到无效项目: {type(item)}") + return + + chat_history_str, bot_reply, platform, group_id, user_name_map = item + # 使用 asyncio.get_running_loop().call_soon(threading.get_ident) 可能不准确,线程ID是同步概念 + # 可以考虑移除线程ID日志或寻找异步安全的获取标识符的方式 + log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀 + logger.debug(f"{log_prefix} 开始处理绰号分析任务...") + + if not self.llm_mapper: + logger.error(f"{log_prefix} LLM 映射器不可用,无法执行分析。") + return + if not self.db_handler.is_available(): + logger.error(f"{log_prefix} 数据库处理器不可用,无法更新计数。") + return + + # 1. 调用 LLM 分析 (内部逻辑不变) + analysis_result = await self._call_llm_for_analysis(chat_history_str, bot_reply, user_name_map) + + # 2. 如果分析成功且找到映射,则更新数据库 (内部逻辑不变) + if analysis_result.get("is_exist") and analysis_result.get("data"): + nickname_map_to_update = analysis_result["data"] + logger.info(f"{log_prefix} LLM 找到绰号映射,准备更新数据库: {nickname_map_to_update}") + + for user_id_str, nickname in nickname_map_to_update.items(): + # ... (验证和数据库更新逻辑保持不变) ... + if not user_id_str or not nickname: + logger.warning(f"{log_prefix} 跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'") + continue + if not user_id_str.isdigit(): + logger.warning(f"{log_prefix} 无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。") + continue + user_id_int = int(user_id_str) + + try: + person_id = person_info_manager.get_person_id(platform, user_id_str) + if not person_id: + logger.error( + f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。" + ) + continue + self.db_handler.upsert_person(person_id, user_id_int, platform) + self.db_handler.update_group_nickname_count(person_id, group_id, nickname) + except (OperationFailure, DuplicateKeyError) as db_err: + logger.exception( + f"{log_prefix} 数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 绰号 {nickname}. 错误: {db_err}" + ) + except Exception as e: + logger.exception(f"{log_prefix} 处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") + else: + logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。") + + async def _call_llm_for_analysis( + self, + chat_history_str: str, + bot_reply: str, + user_name_map: Dict[str, str], + ) -> Dict[str, Any]: + """ + 内部方法:调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射。 + """ + # ... (此方法内部逻辑保持不变) ... + if not self.llm_mapper: + logger.error("LLM 映射器未初始化,无法执行分析。") + return {"is_exist": False} + + prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map) + logger.debug(f"构建的绰号映射 Prompt:\n{prompt[:500]}...") + + try: + response_content, _, _ = await self.llm_mapper.generate_response(prompt) + logger.debug(f"LLM 原始响应 (绰号映射): {response_content}") + + if not response_content: + logger.warning("LLM 返回了空的绰号映射内容。") + return {"is_exist": False} + + response_content = response_content.strip() + markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL | re.IGNORECASE) + match = markdown_code_regex.match(response_content) + if match: + response_content = match.group(1).strip() + elif response_content.startswith("{") and response_content.endswith("}"): + pass # 可能是纯 JSON + else: + json_match = re.search(r"\{.*\}", response_content, re.DOTALL) + if json_match: + response_content = json_match.group(0) + else: + logger.warning(f"LLM 响应似乎不包含有效的 JSON 对象。响应: {response_content}") + return {"is_exist": False} + + result = json.loads(response_content) + + if not isinstance(result, dict): + logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}") + return {"is_exist": False} + + is_exist = result.get("is_exist") + + if is_exist is True: + original_data = result.get("data") + if isinstance(original_data, dict) and original_data: + logger.info(f"LLM 找到的原始绰号映射: {original_data}") + filtered_data = self._filter_llm_results(original_data, user_name_map) + if not filtered_data: + logger.info("所有找到的绰号映射都被过滤掉了。") + return {"is_exist": False} + else: + logger.info(f"过滤后的绰号映射: {filtered_data}") + return {"is_exist": True, "data": filtered_data} + else: + logger.warning(f"LLM 响应格式错误: is_exist=True 但 data 无效。原始 data: {original_data}") + return {"is_exist": False} + elif is_exist is False: + logger.info("LLM 明确指示未找到可靠的绰号映射 (is_exist=False)。") + return {"is_exist": False} + else: + logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 无效。") + return {"is_exist": False} + + except json.JSONDecodeError as json_err: + logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}") + return {"is_exist": False} + except Exception as e: + logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True) + return {"is_exist": False} + + def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]: + """过滤 LLM 返回的绰号映射结果。""" + filtered_data = {} + bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, "BOT_QQ") else None + + for user_id, nickname in original_data.items(): + if not isinstance(user_id, str): + logger.warning(f"过滤掉非字符串 user_id: {user_id}") + continue + if bot_qq_str and user_id == bot_qq_str: + logger.debug(f"过滤掉机器人自身的映射: ID {user_id}") + continue + if not nickname or nickname.isspace(): + logger.debug(f"过滤掉用户 {user_id} 的空绰号。") + continue + # person_name = user_name_map.get(user_id) + # if person_name and person_name == nickname: + # logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。") + # continue + filtered_data[user_id] = nickname.strip() + + return filtered_data + + # 线程相关 + # 修改:使用 run_async_loop 辅助函数 + def _run_processor_in_thread(self): + """后台线程入口函数,使用辅助函数管理 asyncio 事件循环。""" + thread_id = threading.get_ident() # 获取线程ID用于日志 + logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...") + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) # 为当前线程设置事件循环 + logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。") + + # 调用辅助函数来运行主处理协程并管理循环生命周期 + run_async_loop(loop, self._processing_loop()) + + logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).") + + # 结束修改 + + # 修改:使用 asyncio.Queue 和 wait_for + async def _processing_loop(self): + """后台线程中运行的异步处理循环 (使用 asyncio.Queue)。""" + # 移除线程ID日志,因为它在异步上下文中不一定准确 + logger.info("绰号异步处理循环已启动。") + + while not self._stop_event.is_set(): # 仍然检查同步的停止事件 + try: + # 使用 asyncio.wait_for 从异步队列获取项目,并设置超时 + item = await asyncio.wait_for(self.nickname_queue.get(), timeout=self.sleep_interval) + + # 处理获取到的项目 (调用异步方法) + await self._analyze_and_update_nicknames(item) + + self.nickname_queue.task_done() # 标记任务完成 + + except asyncio.TimeoutError: + # 等待超时,相当于之前 queue.Empty,继续循环检查停止事件 + continue + except asyncio.CancelledError: + # 协程被取消 (通常在 stop_processor 中发生) + logger.info("绰号处理循环被取消。") + break # 退出循环 + except Exception as e: + # 捕获处理单个项目时可能发生的其他异常 + logger.error(f"绰号处理循环出错: {e}", exc_info=True) + # 短暂异步休眠避免快速连续失败 + await asyncio.sleep(5) + + logger.info("绰号异步处理循环已结束。") + # 可以在这里添加清理逻辑,比如确保队列为空或处理剩余项目 + # 例如:await self.nickname_queue.join() # 等待所有任务完成 (如果需要) + + # 结束修改 + + +# 在模块级别创建单例实例 +nickname_manager = NicknameManager() diff --git a/src/plugins/group_nickname/nickname_mapper.py b/src/plugins/group_nickname/nickname_mapper.py new file mode 100644 index 00000000..35f96445 --- /dev/null +++ b/src/plugins/group_nickname/nickname_mapper.py @@ -0,0 +1,78 @@ +# src/plugins/group_nickname/nickname_mapper.py +from typing import Dict +from src.common.logger_manager import get_logger + +# 这个文件现在只负责构建 Prompt,LLM 的初始化和调用移至 NicknameManager + +logger = get_logger("nickname_mapper") + +# LLMRequest 实例和 analyze_chat_for_nicknames 函数已被移除 + + +def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str: + """ + 构建用于 LLM 进行绰号映射分析的 Prompt。 + + Args: + chat_history_str: 格式化后的聊天历史记录字符串。 + bot_reply: Bot 的最新回复字符串。 + user_name_map: 用户 ID 到已知名称(person_name 或 fallback nickname)的映射。 + + Returns: + str: 构建好的 Prompt 字符串。 + """ + # 将 user_name_map 格式化为列表字符串 + user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items() if uid and name]) + if not user_list_str: + user_list_str = "无" # 如果映射为空,明确告知 + + # 核心 Prompt 内容 + prompt = f""" +任务:仔细分析以下聊天记录和“你的最新回复”,判断其中是否明确提到了某个用户的绰号,并且这个绰号可以清晰地与一个特定的用户 ID 对应起来。 + +已知用户信息(ID: 名称): +{user_list_str} +*注意:名称后面带有"(你)"表示是你自己。* + +聊天记录: +--- +{chat_history_str} +--- + +你的最新回复: +{bot_reply} + +分析要求与输出格式: +1. 找出聊天记录和“你的最新回复”中可能是用户绰号的词语。 +2. 判断这些绰号是否在上下文中**清晰、无歧义**地指向了“已知用户信息”列表中的**某一个特定用户 ID**。必须是强关联,避免猜测。 +3. **不要**输出你自己(名称后带"(你)"的用户)的绰号映射。 + **不要**输出与用户已知名称完全相同的词语作为绰号。 + **不要**将在“你的最新回复”中你对他人使用的称呼或绰号进行映射(只分析聊天记录中他人对用户的称呼)。 + **不要**输出指代不明或过于通用的词语(如“大佬”、“兄弟”、“那个谁”等,除非上下文能非常明确地指向特定用户)。 +4. 如果找到了**至少一个**满足上述所有条件的**明确**的用户 ID 到绰号的映射关系,请输出 JSON 对象: + ```json + {{ + "is_exist": true, + "data": {{ + "用户A数字id": "绰号_A", + "用户B数字id": "绰号_B" + }} + }} + ``` + - `"data"` 字段的键必须是用户的**数字 ID (字符串形式)**,值是对应的**绰号 (字符串形式)**。 + - 只包含你能**百分百确认**映射关系的条目。宁缺毋滥。 + 如果**无法找到任何一个**满足条件的明确映射关系,请输出 JSON 对象: + ```json + {{ + "is_exist": false + }} + ``` +5. 请**仅**输出 JSON 对象,不要包含任何额外的解释、注释或代码块标记之外的文本。 + +输出: +""" + # logger.debug(f"构建的绰号映射 Prompt (部分):\n{prompt[:500]}...") # 可以在 NicknameManager 中记录 + return prompt + + +# analyze_chat_for_nicknames 函数已被移除,其逻辑移至 NicknameManager._call_llm_for_analysis diff --git a/src/plugins/group_nickname/nickname_utils.py b/src/plugins/group_nickname/nickname_utils.py new file mode 100644 index 00000000..4fdca08d --- /dev/null +++ b/src/plugins/group_nickname/nickname_utils.py @@ -0,0 +1,175 @@ +import random +from typing import List, Dict, Tuple +from src.common.logger_manager import get_logger +from src.config.config import global_config + +# 这个文件现在只包含纯粹的工具函数,与状态和流程无关 + +logger = get_logger("nickname_utils") + + +def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]: + """ + 从给定的绰号信息中,根据映射次数加权随机选择最多 N 个绰号用于 Prompt。 + + Args: + all_nicknames_info: 包含用户及其绰号信息的字典,格式为 + { "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... } + 注意:这里的用户名是 person_name。 + + Returns: + List[Tuple[str, str, int]]: 选中的绰号列表,每个元素为 (用户名, 绰号, 次数)。 + 按次数降序排序。 + """ + if not all_nicknames_info: + return [] + + candidates = [] # 存储 (用户名, 绰号, 次数, 权重) + smoothing_factor = getattr(global_config, "NICKNAME_PROBABILITY_SMOOTHING", 1.0) # 平滑因子,避免权重为0 + + for user_name, nicknames in all_nicknames_info.items(): + if nicknames and isinstance(nicknames, list): + for nickname_entry in nicknames: + # 确保条目是字典且只有一个键值对 + if isinstance(nickname_entry, dict) and len(nickname_entry) == 1: + nickname, count = list(nickname_entry.items())[0] + # 确保次数是正整数 + if isinstance(count, int) and count > 0 and isinstance(nickname, str) and nickname: + weight = count + smoothing_factor # 计算权重 + candidates.append((user_name, nickname, count, weight)) + else: + logger.warning( + f"用户 '{user_name}' 的绰号条目无效: {nickname_entry} (次数非正整数或绰号为空)。已跳过。" + ) + else: + logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。") + + if not candidates: + return [] + + # 确定需要选择的数量 + max_nicknames = getattr(global_config, "MAX_NICKNAMES_IN_PROMPT", 5) + num_to_select = min(max_nicknames, len(candidates)) + + try: + # 调用加权随机抽样(不重复) + selected_candidates_with_weight = weighted_sample_without_replacement(candidates, num_to_select) + + # 如果抽样结果数量不足(例如权重问题导致提前退出),可以考虑是否需要补充 + if len(selected_candidates_with_weight) < num_to_select: + logger.debug( + f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),尝试补充选择次数最多的。" + ) + # 筛选出未被选中的候选 + selected_ids = set( + (c[0], c[1]) for c in selected_candidates_with_weight + ) # 使用 (用户名, 绰号) 作为唯一标识 + remaining_candidates = [c for c in candidates if (c[0], c[1]) not in selected_ids] + remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + needed = num_to_select - len(selected_candidates_with_weight) + selected_candidates_with_weight.extend(remaining_candidates[:needed]) + + except Exception as e: + # 日志:记录加权随机选择时发生的错误,并回退到简单选择 + logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True) + # 出错时回退到选择次数最多的 N 个 + candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + selected_candidates_with_weight = candidates[:num_to_select] + + # 格式化输出结果为 (用户名, 绰号, 次数),移除权重 + result = [(user, nick, count) for user, nick, count, _weight in selected_candidates_with_weight] + + # 按次数降序排序最终结果 + result.sort(key=lambda x: x[2], reverse=True) + + logger.debug(f"为 Prompt 选择的绰号: {result}") + return result + + +def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str: + """ + 将选中的绰号信息格式化为注入 Prompt 的字符串。 + + Args: + selected_nicknames: 选中的绰号列表 (用户名, 绰号, 次数)。 + + Returns: + str: 格式化后的字符串,如果列表为空则返回空字符串。 + """ + if not selected_nicknames: + return "" + + # Prompt 注入部分的标题 + prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),供你参考:"] + grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组 + + # 按用户分组绰号 + for user_name, nickname, _count in selected_nicknames: + if user_name not in grouped_by_user: + grouped_by_user[user_name] = [] + # 添加中文引号以区分绰号 + grouped_by_user[user_name].append(f"“{nickname}”") + + # 构建每个用户的绰号字符串 + for user_name, nicknames in grouped_by_user.items(): + nicknames_str = "、".join(nicknames) # 使用中文顿号连接 + # 格式化输出,例如: "- 张三,ta 可能被称为:“三儿”、“张哥”" + prompt_lines.append(f"- {user_name},ta 可能被称为:{nicknames_str}") + + # 如果只有标题行,返回空字符串,避免注入无意义的标题 + if len(prompt_lines) > 1: + # 末尾加换行符,以便在 Prompt 中正确分隔 + return "\n".join(prompt_lines) + "\n" + else: + return "" + + +def weighted_sample_without_replacement( + candidates: List[Tuple[str, str, int, float]], k: int +) -> List[Tuple[str, str, int, float]]: + """ + 执行不重复的加权随机抽样。使用 A-ExpJ 算法思想的简化实现。 + + Args: + candidates: 候选列表,每个元素为 (用户名, 绰号, 次数, 权重)。 + k: 需要选择的数量。 + + Returns: + List[Tuple[str, str, int, float]]: 选中的元素列表(包含权重)。 + """ + if k <= 0: + return [] + n = len(candidates) + if k >= n: + return candidates[:] # 返回副本 + + # 计算每个元素的 key = U^(1/weight),其中 U 是 (0, 1) 之间的随机数 + # 为了数值稳定性,计算 log(key) = log(U) / weight + # log(U) 可以用 -Exponential(1) 来生成 + weighted_keys = [] + for i in range(n): + weight = candidates[i][3] + if weight <= 0: + # 处理权重为0或负数的情况,赋予一个极小的概率(或极大负数的log_key) + log_key = float("-inf") # 或者一个非常大的负数 + logger.warning(f"候选者 {candidates[i][:2]} 的权重为非正数 ({weight}),抽中概率极低。") + else: + log_u = -random.expovariate(1.0) # 生成 -Exponential(1) 随机数 + log_key = log_u / weight + weighted_keys.append((log_key, i)) # 存储 (log_key, 原始索引) + + # 按 log_key 降序排序 (相当于按 key 升序排序) + weighted_keys.sort(key=lambda x: x[0], reverse=True) + + # 选择 log_key 最大的 k 个元素的原始索引 + selected_indices = [index for _log_key, index in weighted_keys[:k]] + + # 根据选中的索引从原始 candidates 列表中获取元素 + selected_items = [candidates[i] for i in selected_indices] + + return selected_items + + +# 移除旧的流程函数 +# get_nickname_injection_for_prompt 和 trigger_nickname_analysis_if_needed +# 的逻辑现在由 NicknameManager 处理 diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 28c17d9a..d8f5f804 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -1,20 +1,20 @@ import asyncio import time import traceback -import random # <--- 添加导入 -import json # <--- 确保导入 json +import random +import json from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine from collections import deque from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending -from src.plugins.chat.message import Seg # Local import needed after move +from src.plugins.chat.message import Seg from src.plugins.chat.chat_stream import ChatStream from src.plugins.chat.message import UserInfo from src.plugins.chat.chat_stream import chat_manager from src.common.logger_manager import get_logger from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config -from src.plugins.chat.utils_image import image_path_to_base64 # Local import needed after move -from src.plugins.utils.timer_calculator import Timer # <--- Import Timer +from src.plugins.chat.utils_image import image_path_to_base64 +from src.plugins.utils.timer_calculator import Timer from src.plugins.emoji_system.emoji_manager import emoji_manager from src.heart_flow.sub_mind import SubMind from src.heart_flow.observation import Observation @@ -28,6 +28,8 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.moods.moods import MoodManager from src.heart_flow.utils_chat import get_chat_type_and_target_info from rich.traceback import install +from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat +from src.plugins.group_nickname.nickname_manager import nickname_manager install(extra_lines=3) @@ -528,11 +530,17 @@ class HeartFChatting: try: if action == "text_reply": - return await handler(reasoning, emoji_query, cycle_timers) + # 调用文本回复处理,它会返回 (bool, thinking_id) + success, thinking_id = await handler(reasoning, emoji_query, cycle_timers) + return success, thinking_id # 直接返回结果 elif action == "emoji_reply": - return await handler(reasoning, emoji_query), "" + # 调用表情回复处理,它只返回 bool + success = await handler(reasoning, emoji_query) + return success, "" # thinking_id 为空字符串 else: # no_reply - return await handler(reasoning, planner_start_db_time, cycle_timers), "" + # 调用不回复处理,它只返回 bool + success = await handler(reasoning, planner_start_db_time, cycle_timers) + return success, "" # thinking_id 为空字符串 except HeartFCError as e: logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") # 出错时也重置计数器 @@ -549,6 +557,7 @@ class HeartFChatting: 2. 创建思考消息 3. 生成回复 4. 发送消息 + 5. [新增] 触发绰号分析 参数: reasoning: 回复原因 @@ -572,6 +581,7 @@ class HeartFChatting: if not thinking_id: raise PlannerError("无法创建思考消息") + reply = None # 初始化 reply try: # 生成回复 with Timer("生成回复", cycle_timers): @@ -585,7 +595,6 @@ class HeartFChatting: raise ReplierError("回复生成失败") # 发送消息 - with Timer("发送消息", cycle_timers): await self._sender( thinking_id=thinking_id, @@ -594,6 +603,9 @@ class HeartFChatting: send_emoji=emoji_query, ) + # 调用工具函数触发绰号分析 + await nickname_manager.trigger_nickname_analysis(anchor_message, reply, self.chat_stream) + return True, thinking_id except (ReplierError, SenderError) as e: @@ -854,6 +866,17 @@ class HeartFChatting: f"{self.log_prefix}[Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}" ) + # 需要获取用于上下文的历史消息 + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), # 使用当前时间作为参考点 + limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit + ) + # 调用工具函数获取格式化后的绰号字符串 + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection( + self.chat_stream, message_list_before_now + ) + # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- prompt = await prompt_builder.build_planner_prompt( is_group_chat=self.is_group_chat, # <-- Pass HFC state @@ -863,6 +886,7 @@ class HeartFChatting: current_mind=current_mind, # <-- Pass argument structured_info=self.sub_mind.structured_info_str, # <-- Pass SubMind info current_available_actions=current_available_actions, # <-- Pass determined actions + nickname_info=nickname_injection_str, ) # --- 调用 LLM (普通文本生成) --- diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index c59168a7..f9445301 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -1,4 +1,6 @@ import random +import time +from typing import Union, Optional, Deque, Dict, Any from ...config.config import global_config from src.common.logger_manager import get_logger from ...individuality.individuality import Individuality @@ -6,14 +8,13 @@ from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.chat.utils import get_embedding -import time -from typing import Union, Optional, Deque, Dict, Any from ...common.database import db from ..chat.utils import get_recent_group_speaker from ..moods.moods import MoodManager from ..memory_system.Hippocampus import HippocampusManager from ..schedule.schedule_generator import bot_schedule from ..knowledge.knowledge_lib import qa_manager +from src.plugins.group_nickname.nickname_manager import nickname_manager import traceback from .heartFC_Cycleinfo import CycleInfo @@ -24,6 +25,7 @@ def init_prompt(): Prompt( """ {info_from_tools} +{nickname_info} {chat_target} {chat_talking_prompt} 现在你想要在群里发言或者回复。\n @@ -53,6 +55,7 @@ def init_prompt(): Prompt( """你的名字是{bot_name},{prompt_personality},{chat_context_description}。需要基于以下信息决定如何参与对话: {structured_info_block} +{nickname_info} {chat_content_block} {current_mind_block} {cycle_info_block} @@ -118,6 +121,7 @@ JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query": {relation_prompt} {prompt_info} {schedule_prompt} +{nickname_info} {chat_target} {chat_talking_prompt} 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言或者回复这条消息。\n @@ -250,9 +254,15 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1") chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") + # 调用新的工具函数获取绰号信息 + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection( + chat_stream, message_list_before_now + ) + prompt = await global_prompt_manager.format_prompt( template_name, info_from_tools=structured_info_prompt, + nickname_info=nickname_injection_str, chat_target=chat_target_1, # Used in group template chat_talking_prompt=chat_talking_prompt, bot_name=global_config.BOT_NICKNAME, @@ -442,6 +452,11 @@ class PromptBuilder: chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1") chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2") + # 调用新的工具函数获取绰号信息 + nickname_injection_str = await nickname_manager.get_nickname_prompt_injection( + chat_stream, message_list_before_now + ) + prompt = await global_prompt_manager.format_prompt( template_name, relation_prompt=relation_prompt, @@ -449,6 +464,7 @@ class PromptBuilder: memory_prompt=memory_prompt, prompt_info=prompt_info, schedule_prompt=schedule_prompt, + nickname_info=nickname_injection_str, # <--- 注入绰号信息 chat_target=chat_target_1, chat_target_2=chat_target_2, chat_talking_prompt=chat_talking_prompt, @@ -486,7 +502,7 @@ class PromptBuilder: prompt_ger=prompt_ger, moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), ) - # --- End choosing template --- + # --- End choosing template --- return prompt @@ -755,6 +771,7 @@ class PromptBuilder: current_mind: Optional[str], structured_info: Dict[str, Any], current_available_actions: Dict[str, str], + nickname_info: str, # replan_prompt: str, # Replan logic still simplified ) -> str: """构建 Planner LLM 的提示词 (获取模板并填充数据)""" @@ -836,6 +853,7 @@ class PromptBuilder: prompt = planner_prompt_template.format( bot_name=global_config.BOT_NICKNAME, + nickname_info=nickname_info, prompt_personality=prompt_personality, chat_context_description=chat_context_description, structured_info_block=structured_info_block, diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index 1c1372c5..6521ae13 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -20,6 +20,7 @@ from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.utils.timer_calculator import Timer from src.heart_flow.utils_chat import get_chat_type_and_target_info +from src.plugins.group_nickname.nickname_manager import nickname_manager logger = get_logger("chat") @@ -316,6 +317,7 @@ class NormalChat: # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) if first_bot_msg: info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg) + await nickname_manager.trigger_nickname_analysis(message, response_set, self.chat_stream) else: logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher") diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py index d4e69d7e..1ae5e415 100644 --- a/src/plugins/person_info/person_info.py +++ b/src/plugins/person_info/person_info.py @@ -53,6 +53,7 @@ person_info_default = { "msg_interval_list": [], "user_cardname": None, # 添加群名片 "user_avatar": None, # 添加头像信息(例如URL或标识符) + "group_nicknames": [], } # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项 diff --git a/src/plugins/person_info/relationship_manager.py b/src/plugins/person_info/relationship_manager.py index 862f2398..f1f70a21 100644 --- a/src/plugins/person_info/relationship_manager.py +++ b/src/plugins/person_info/relationship_manager.py @@ -5,6 +5,8 @@ from bson.decimal128 import Decimal128 from .person_info import person_info_manager import time import random +from typing import List, Dict +from ...common.database import db from maim_message import UserInfo # import re # import traceback @@ -81,6 +83,131 @@ class RelationshipManager: is_known = person_info_manager.is_person_known(platform, user_id) return is_known + # --- [修改] 使用全局 db 对象进行查询 --- + @staticmethod + async def get_person_names_batch(platform: str, user_ids: List[str]) -> Dict[str, str]: + """ + 批量获取多个用户的 person_name。 + """ + if not user_ids: + return {} + + person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids] + names_map = {} + try: + cursor = db.person_info.find( + {"person_id": {"$in": person_ids}}, + {"_id": 0, "person_id": 1, "user_id": 1, "person_name": 1}, # 只查询需要的字段 + ) + + for doc in cursor: + user_id_val = doc.get("user_id") # 获取原始值 + original_user_id = None # 初始化 + + if isinstance(user_id_val, (int, float)): # 检查是否是数字类型 + original_user_id = str(user_id_val) # 直接转换为字符串 + elif isinstance(user_id_val, str): # 检查是否是字符串 + if "_" in user_id_val: # 如果包含下划线,则分割 + original_user_id = user_id_val.split("_", 1)[-1] + else: # 如果不包含下划线,则直接使用该字符串 + original_user_id = user_id_val + # else: # 其他类型或 None,original_user_id 保持为 None + + person_name = doc.get("person_name") + + # 确保 original_user_id 和 person_name 都有效 + if original_user_id and person_name: + names_map[original_user_id] = person_name + + logger.debug(f"批量获取 {len(user_ids)} 个用户的 person_name,找到 {len(names_map)} 个。") + except AttributeError as e: + # 如果 db 对象没有 person_info 属性,或者 find 方法不存在 + logger.error(f"访问数据库时出错: {e}。请检查 common/database.py 和集合名称。") + except Exception as e: + logger.error(f"批量获取 person_name 时出错: {e}", exc_info=True) + return names_map + + @staticmethod + async def get_users_group_nicknames( + platform: str, user_ids: List[str], group_id: str + ) -> Dict[str, List[Dict[str, int]]]: + """ + 批量获取多个用户在指定群组的绰号信息。 + + Args: + platform (str): 平台名称。 + user_ids (List[str]): 用户 ID 列表。 + group_id (str): 群组 ID。 + + Returns: + Dict[str, List[Dict[str, int]]]: 映射 {person_name: [{"绰号A": 次数}, ...]} + """ + if not user_ids or not group_id: + return {} + + person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids] + nicknames_data = {} + group_id_str = str(group_id) # 确保 group_id 是字符串 + + try: + # 查询包含目标 person_id 的文档 + cursor = db.person_info.find( + {"person_id": {"$in": person_ids}}, + {"_id": 0, "person_id": 1, "person_name": 1, "group_nicknames": 1}, # 查询所需字段 + ) + + # 假设同步迭代可行 + for doc in cursor: + person_name = doc.get("person_name") + if not person_name: + continue # 跳过没有 person_name 的用户 + + group_nicknames_list = doc.get("group_nicknames", []) # 获取 group_nicknames 数组 + target_group_nicknames = [] # 存储目标群组的绰号列表 + + # 遍历 group_nicknames 数组,查找匹配的 group_id + for group_entry in group_nicknames_list: + # 确保 group_entry 是字典且包含 group_id 键 + if isinstance(group_entry, dict) and group_entry.get("group_id") == group_id_str: + # 提取 nicknames 列表 + nicknames_raw = group_entry.get("nicknames", []) + if isinstance(nicknames_raw, list): + target_group_nicknames = nicknames_raw + break # 找到匹配的 group_id 后即可退出内层循环 + + # 如果找到了目标群组的绰号列表 + if target_group_nicknames: + valid_nicknames_formatted = [] # 存储格式化后的绰号 + for item in target_group_nicknames: + # 校验每个绰号条目的格式 { "name": str, "count": int } + if ( + isinstance(item, dict) + and isinstance(item.get("name"), str) + and isinstance(item.get("count"), int) + and item["count"] > 0 + ): # 确保 count 是正整数 + # --- 格式转换:从 { "name": "xxx", "count": y } 转为 { "xxx": y } --- + valid_nicknames_formatted.append({item["name"]: item["count"]}) + # --- 结束格式转换 --- + else: + logger.warning( + f"数据库中用户 {person_name} 群组 {group_id_str} 的绰号格式无效或 count <= 0: {item}" + ) + + if valid_nicknames_formatted: # 如果存在有效的、格式化后的绰号 + nicknames_data[person_name] = valid_nicknames_formatted # 使用 person_name 作为 key + + logger.debug( + f"批量获取群组 {group_id_str} 中 {len(user_ids)} 个用户的绰号,找到 {len(nicknames_data)} 个用户的数据。" + ) + + except AttributeError as e: + logger.error(f"访问数据库时出错: {e}。请检查 common/database.py 和集合名称 'person_info'。") + except Exception as e: + logger.error(f"批量获取群组绰号时出错: {e}", exc_info=True) + + return nicknames_data + @staticmethod async def is_qved_name(platform, user_id): """判断是否认识某人""" diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 5f215009..6723f2de 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "1.6.1" +version = "1.6.2" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -123,6 +123,13 @@ steal_emoji = true # 是否偷取表情包,让麦麦可以发送她保存的 enable_check = false # 是否启用表情包过滤,只有符合该要求的表情包才会被保存 check_prompt = "符合公序良俗" # 表情包过滤要求,只有符合该要求的表情包才会被保存 +[group_nickname] +enable_nickname_mapping = false # 绰号映射功能总开关(默认关闭,建议关闭) +max_nicknames_in_prompt = 10 # Prompt 中最多注入的绰号数量(防止token数量爆炸) +nickname_probability_smoothing = 1 # 绰号加权随机选择的平滑因子 +nickname_queue_max_size = 100 # 绰号处理队列最大容量 +nickname_process_sleep_interval = 60 # 绰号处理进程休眠间隔(秒) + [memory] build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多 build_memory_distribution = [6.0,3.0,0.6,32.0,12.0,0.4] # 记忆构建分布,参数:分布1均值,标准差,权重,分布2均值,标准差,权重 @@ -291,6 +298,13 @@ provider = "SILICONFLOW" pri_in = 2 pri_out = 8 +#绰号映射生成模型 +[model.llm_nickname_mapping] +name = "deepseek-ai/DeepSeek-V3" +provider = "SILICONFLOW" +temp = 0.3 +pri_in = 2 +pri_out = 8 #以下模型暂时没有使用!! #以下模型暂时没有使用!!