From c7cf0b102e0327449010bb5ca50252376bfd09fb Mon Sep 17 00:00:00 2001 From: Bakadax Date: Fri, 2 May 2025 00:50:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E7=82=B9=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/group_nickname/nickname_utils.py | 390 ++++++++----------- 1 file changed, 166 insertions(+), 224 deletions(-) diff --git a/src/plugins/group_nickname/nickname_utils.py b/src/plugins/group_nickname/nickname_utils.py index 2cff5a01..641c2999 100644 --- a/src/plugins/group_nickname/nickname_utils.py +++ b/src/plugins/group_nickname/nickname_utils.py @@ -1,232 +1,174 @@ -import random -import time -from typing import List, Dict, Tuple, Optional +import json +from typing import Dict, Any, Optional from src.common.logger_manager import get_logger +from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config -from src.plugins.person_info.relationship_manager import relationship_manager -from src.plugins.chat.chat_stream import ChatStream -from src.plugins.chat.message import MessageRecv -from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat -from .nickname_processor import add_to_nickname_queue -logger = get_logger("nickname_utils") - - -def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]: - """ - 从给定的绰号信息中,根据映射次数加权随机选择最多 N 个绰号。 - """ - if not all_nicknames_info: - return [] - - candidates = [] - for user_name, nicknames in all_nicknames_info.items(): - if nicknames: - for nickname_entry in nicknames: - if isinstance(nickname_entry, dict) and len(nickname_entry) == 1: - nickname, count = list(nickname_entry.items())[0] - if isinstance(count, int) and count > 0: - weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING - candidates.append((user_name, nickname, count, weight)) - else: - logger.warning( - f"Invalid count for nickname '{nickname}' of user '{user_name}': {count}. Skipping." - ) - else: - logger.warning(f"Invalid nickname entry format for user '{user_name}': {nickname_entry}. Skipping.") - - if not candidates: - return [] - - total_weight = sum(c[3] for c in candidates) - - if total_weight <= 0: - candidates.sort(key=lambda x: x[2], reverse=True) - selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT] - else: - probabilities = [c[3] / total_weight for c in candidates] - num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates)) - try: - selected_indices = set() - selected = [] - attempts = 0 - max_attempts = num_to_select * 5 - while len(selected) < num_to_select and attempts < max_attempts: - chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0] - if chosen_index not in selected_indices: - selected_indices.add(chosen_index) - selected.append(candidates[chosen_index]) - attempts += 1 - if len(selected) < num_to_select: - remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices] - remaining_candidates.sort(key=lambda x: x[2], reverse=True) - needed = num_to_select - len(selected) - selected.extend(remaining_candidates[:needed]) - except Exception as e: - logger.error( - f"Error during weighted random choice for nicknames: {e}. Falling back to top N.", exc_info=True - ) - candidates.sort(key=lambda x: x[2], reverse=True) - selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT] - - result = [(user, nick, count) for user, nick, count, _weight in selected] - result.sort(key=lambda x: x[2], reverse=True) - logger.debug(f"Selected nicknames for prompt: {result}") - return result - - -def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str: - """ - 将选中的绰号信息格式化为注入 Prompt 的字符串。 - (代码保持不变) - """ - if not selected_nicknames: - return "" - - prompt_lines = ["【群成员绰号信息】"] - grouped_by_user: Dict[str, List[str]] = {} - - for user_name, nickname, _count in selected_nicknames: - if user_name not in grouped_by_user: - grouped_by_user[user_name] = [] - grouped_by_user[user_name].append(f"“{nickname}”") - - for user_name, nicknames in grouped_by_user.items(): - nicknames_str = "、".join(nicknames) - prompt_lines.append(f"- {user_name},有时被称为:{nicknames_str}") - - if len(prompt_lines) > 1: - return "\n".join(prompt_lines) + "\n" - else: - return "" - - -async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str: - """ - 获取并格式化用于 Prompt 注入的绰号信息字符串。 - """ - nickname_injection_str = "" - if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info: - try: - group_id = str(chat_stream.group_info.group_id) - user_ids_in_context = set() - if message_list_before_now: - for msg in message_list_before_now: - sender_id = msg["user_info"].get("user_id") - if sender_id: - user_ids_in_context.add(str(sender_id)) - else: - recent_speakers = chat_stream.get_recent_speakers(limit=5) - for speaker in recent_speakers: - user_ids_in_context.add(str(speaker['user_id'])) - if not user_ids_in_context: - logger.warning(f"[{chat_stream.stream_id}] No messages or recent speakers found for nickname injection.") - - if user_ids_in_context: - platform = chat_stream.platform - all_nicknames_data = await relationship_manager.get_users_group_nicknames( - platform, list(user_ids_in_context), group_id - ) - if all_nicknames_data: - selected_nicknames = select_nicknames_for_prompt(all_nicknames_data) - nickname_injection_str = format_nickname_prompt_injection(selected_nicknames) - if nickname_injection_str: - logger.debug(f"[{chat_stream.stream_id}] Generated nickname info for prompt:\n{nickname_injection_str}") - except Exception as e: - logger.error(f"[{chat_stream.stream_id}] Error getting or formatting nickname info for prompt: {e}", exc_info=True) - nickname_injection_str = "" - return nickname_injection_str - - -# --- 新增:触发绰号分析的工具函数 --- -async def trigger_nickname_analysis_if_needed( - anchor_message: MessageRecv, - bot_reply: List[str], - chat_stream: Optional[ChatStream] = None # 允许传入 chat_stream 或从 anchor_message 获取 -): - """ - 如果满足条件(群聊、功能开启),则准备数据并触发绰号分析任务。 - - Args: - anchor_message: 触发回复的原始消息对象。 - bot_reply: Bot 生成的回复内容列表。 - chat_stream: 可选的 ChatStream 对象。 - """ - # 检查功能是否开启 - if not global_config.ENABLE_NICKNAME_MAPPING: - return - - # 确定使用的 chat_stream - current_chat_stream = chat_stream or anchor_message.chat_stream - - # 检查是否是群聊且 chat_stream 有效 - if not current_chat_stream or not current_chat_stream.group_info: - logger.debug(f"[{current_chat_stream.stream_id if current_chat_stream else 'Unknown'}] Skipping nickname analysis: Not a group chat or invalid chat stream.") - return - - log_prefix = f"[{current_chat_stream.stream_id}]" # 日志前缀 +logger = get_logger("nickname_mapper") +llm_mapper: Optional[LLMRequest] = None +if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 try: - # 1. 获取历史记录 - history_limit = 30 # 可配置的历史记录条数 - history_messages = get_raw_msg_before_timestamp_with_chat( - chat_id=current_chat_stream.stream_id, - timestamp=time.time(), - limit=history_limit, - ) - - # 格式化历史记录 - chat_history_str = await build_readable_messages( - messages=history_messages, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, - truncate=False, - ) - - # 2. 获取 Bot 回复字符串 - bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表 - - # 3. 获取群号和平台 - group_id = str(current_chat_stream.group_info.group_id) - platform = current_chat_stream.platform - - # 4. 构建用户 ID 到名称的映射 - user_ids_in_history = set() - for msg in history_messages: - sender_id = msg["user_info"].get("user_id") - if sender_id: - user_ids_in_history.add(str(sender_id)) - - user_name_map = {} - if user_ids_in_history: - try: - # 批量获取 person_name - names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history)) - except Exception as e: - logger.error(f"{log_prefix} Error getting person names batch: {e}", exc_info=True) - names_data = {} - - for user_id in user_ids_in_history: - if user_id in names_data: - user_name_map[user_id] = names_data[user_id] - else: - # 回退查找 nickname (从后往前找最新的) - latest_nickname = next( - ( - m["user_info"].get("user_nickname") # 从 user_info 获取 - for m in reversed(history_messages) - if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") # 确保 nickname 存在 - ), - None, - ) - user_name_map[user_id] = latest_nickname or f"未知({user_id})" # 提供回退 - - # 5. 添加到处理队列 - await add_to_nickname_queue(chat_history_str, bot_reply_str, platform, group_id, user_name_map) - logger.debug(f"{log_prefix} Triggered nickname analysis for group {group_id}.") + # 从全局配置获取模型设置 + model_config = global_config.llm_nickname_mapping + if not model_config or not model_config.get("name"): + logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。") + else: + llm_mapper = LLMRequest( # <-- LLM 初始化 + model=global_config.llm_nickname_mapping, + temperature=global_config.llm_nickname_mapping["temp"], + max_tokens=256, + request_type="nickname_mapping", + ) + logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。") except Exception as e: - logger.error(f"{log_prefix} Error triggering nickname analysis: {e}", exc_info=True) \ No newline at end of file + logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True) + llm_mapper = None + +def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str: + """构建用于 LLM 绰号映射的 Prompt""" + # user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射 + user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()]) + # print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print + prompt = f""" +任务:分析以下聊天记录和你的最新回复,判断其中是否包含用户绰号,并确定绰号与用户 ID 之间是否存在明确的一一对应关系。 + +已知用户信息(ID: 名称): +{user_list_str} + +聊天记录: +--- +{chat_history_str} +--- + +你的最新回复: +{bot_reply} + +分析要求: +1. 识别聊天记录和你发言中出现的可能是用户绰号的词语。 +2. 判断这些绰号是否能明确地指向某个特定的用户 ID。一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来。 +3. 如果能建立可靠的一一映射关系,请输出一个 JSON 对象,格式如下: + {{ + "is_exist": true, + "data": {{ + "用户A数字id": "绰号_A", + "用户B数字id": "绰号_B" + }} + }} + 其中 "data" 字段的键是用户的 ID (字符串形式),值是对应的绰号。只包含你能确认映射关系的绰号。 +4. 如果无法建立任何可靠的一一映射关系(例如,绰号指代不明、没有出现绰号、或无法确认绰号与用户的关联),请输出 JSON 对象: + {{ + "is_exist": false + }} +5. 在“已知用户信息”列表中,你的昵称后面可能包含"(你)",这表示是你自己,不需要输出你自身的绰号映射。请确保不要将你自己的ID和任何词语映射为绰号。 +6. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。 + +输出: +""" + return prompt + + +async def analyze_chat_for_nicknames( + chat_history_str: str, + bot_reply: str, + user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息 +) -> Dict[str, Any]: + """ + 调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。 + """ + if not global_config.ENABLE_NICKNAME_MAPPING: + logger.debug("绰号映射功能已禁用。") + return {"is_exist": False} + + if llm_mapper is None: + logger.error("绰号映射 LLM 未初始化。无法执行分析。") + return {"is_exist": False} + + prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map) + logger.debug(f"构建的绰号映射 Prompt:\n{prompt}") + + try: + # 调用 LLM + response_content, _, _ = await llm_mapper.generate_response(prompt) + logger.debug(f"LLM 原始响应 (绰号映射): {response_content}") + + if not response_content: + logger.warning("LLM 返回了空的绰号映射内容。") + return {"is_exist": False} + + # 清理可能的 Markdown 代码块标记 + response_content = response_content.strip() + if response_content.startswith("```json"): + response_content = response_content[7:] + if response_content.endswith("```"): + response_content = response_content[:-3] + response_content = response_content.strip() + + try: + result = json.loads(response_content) + if isinstance(result, dict) and "is_exist" in result: + if result["is_exist"] is True: + original_data = result.get("data") # 使用 .get() 更安全 + if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典 + logger.info(f"LLM 找到的原始绰号映射: {original_data}") + + # --- 开始过滤 --- + filtered_data = {} + bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较 + + for user_id, nickname in original_data.items(): + # 检查 user_id 是否是字符串,以防万一 + if not isinstance(user_id, str): + logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。") + continue + + # 条件 1: 排除机器人自身 + if user_id == bot_qq_str: + logger.debug(f"过滤掉机器人自身的映射: ID {user_id}") + continue + + # 条件 2: 排除 nickname 与 person_name 相同的情况 + person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name + if person_name and person_name == nickname: + logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。") + continue + + # 如果通过所有过滤条件,则保留 + filtered_data[user_id] = nickname + # --- 结束过滤 --- + + # 检查过滤后是否还有数据 + if not filtered_data: + logger.info("所有找到的绰号映射都被过滤掉了。") + return {"is_exist": False} + else: + logger.info(f"过滤后的绰号映射: {filtered_data}") + return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据 + + else: + # is_exist 为 True 但 data 缺失、不是字典或为空 + if "data" not in result: + logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。") + elif not isinstance(result.get("data"), dict): + logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。") + else: # data 为空字典 + logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。") + return {"is_exist": False} + elif result["is_exist"] is False: + logger.info("LLM 未找到可靠的绰号映射。") + return {"is_exist": False} + else: + logger.warning("LLM 响应格式错误: 'is_exist' 不是布尔值。") + return {"is_exist": False} + else: + logger.warning("LLM 响应格式错误: 缺少 'is_exist' 键或不是字典。") + return {"is_exist": False} + except json.JSONDecodeError as json_err: + logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}") + return {"is_exist": False} + + except Exception as e: + logger.error(f"绰号映射 LLM 调用或处理过程中出错: {e}", exc_info=True) + return {"is_exist": False}