加点注释

pull/914/head
Bakadax 2025-05-02 00:50:13 +08:00
parent 7c95166e0a
commit c7cf0b102e
1 changed files with 166 additions and 224 deletions

View File

@ -1,232 +1,174 @@
import random
import time
from typing import List, Dict, Tuple, Optional
import json
from typing import Dict, Any, Optional
from src.common.logger_manager import get_logger
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import MessageRecv
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from .nickname_processor import add_to_nickname_queue
logger = get_logger("nickname_utils")
def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]:
"""
从给定的绰号信息中根据映射次数加权随机选择最多 N 个绰号
"""
if not all_nicknames_info:
return []
candidates = []
for user_name, nicknames in all_nicknames_info.items():
if nicknames:
for nickname_entry in nicknames:
if isinstance(nickname_entry, dict) and len(nickname_entry) == 1:
nickname, count = list(nickname_entry.items())[0]
if isinstance(count, int) and count > 0:
weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING
candidates.append((user_name, nickname, count, weight))
else:
logger.warning(
f"Invalid count for nickname '{nickname}' of user '{user_name}': {count}. Skipping."
)
else:
logger.warning(f"Invalid nickname entry format for user '{user_name}': {nickname_entry}. Skipping.")
if not candidates:
return []
total_weight = sum(c[3] for c in candidates)
if total_weight <= 0:
candidates.sort(key=lambda x: x[2], reverse=True)
selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT]
else:
probabilities = [c[3] / total_weight for c in candidates]
num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates))
try:
selected_indices = set()
selected = []
attempts = 0
max_attempts = num_to_select * 5
while len(selected) < num_to_select and attempts < max_attempts:
chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0]
if chosen_index not in selected_indices:
selected_indices.add(chosen_index)
selected.append(candidates[chosen_index])
attempts += 1
if len(selected) < num_to_select:
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
remaining_candidates.sort(key=lambda x: x[2], reverse=True)
needed = num_to_select - len(selected)
selected.extend(remaining_candidates[:needed])
except Exception as e:
logger.error(
f"Error during weighted random choice for nicknames: {e}. Falling back to top N.", exc_info=True
)
candidates.sort(key=lambda x: x[2], reverse=True)
selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT]
result = [(user, nick, count) for user, nick, count, _weight in selected]
result.sort(key=lambda x: x[2], reverse=True)
logger.debug(f"Selected nicknames for prompt: {result}")
return result
def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str:
"""
将选中的绰号信息格式化为注入 Prompt 的字符串
(代码保持不变)
"""
if not selected_nicknames:
return ""
prompt_lines = ["【群成员绰号信息】"]
grouped_by_user: Dict[str, List[str]] = {}
for user_name, nickname, _count in selected_nicknames:
if user_name not in grouped_by_user:
grouped_by_user[user_name] = []
grouped_by_user[user_name].append(f"{nickname}")
for user_name, nicknames in grouped_by_user.items():
nicknames_str = "".join(nicknames)
prompt_lines.append(f"- {user_name},有时被称为:{nicknames_str}")
if len(prompt_lines) > 1:
return "\n".join(prompt_lines) + "\n"
else:
return ""
async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str:
"""
获取并格式化用于 Prompt 注入的绰号信息字符串
"""
nickname_injection_str = ""
if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info:
try:
group_id = str(chat_stream.group_info.group_id)
user_ids_in_context = set()
if message_list_before_now:
for msg in message_list_before_now:
sender_id = msg["user_info"].get("user_id")
if sender_id:
user_ids_in_context.add(str(sender_id))
else:
recent_speakers = chat_stream.get_recent_speakers(limit=5)
for speaker in recent_speakers:
user_ids_in_context.add(str(speaker['user_id']))
if not user_ids_in_context:
logger.warning(f"[{chat_stream.stream_id}] No messages or recent speakers found for nickname injection.")
if user_ids_in_context:
platform = chat_stream.platform
all_nicknames_data = await relationship_manager.get_users_group_nicknames(
platform, list(user_ids_in_context), group_id
)
if all_nicknames_data:
selected_nicknames = select_nicknames_for_prompt(all_nicknames_data)
nickname_injection_str = format_nickname_prompt_injection(selected_nicknames)
if nickname_injection_str:
logger.debug(f"[{chat_stream.stream_id}] Generated nickname info for prompt:\n{nickname_injection_str}")
except Exception as e:
logger.error(f"[{chat_stream.stream_id}] Error getting or formatting nickname info for prompt: {e}", exc_info=True)
nickname_injection_str = ""
return nickname_injection_str
# --- 新增:触发绰号分析的工具函数 ---
async def trigger_nickname_analysis_if_needed(
anchor_message: MessageRecv,
bot_reply: List[str],
chat_stream: Optional[ChatStream] = None # 允许传入 chat_stream 或从 anchor_message 获取
):
"""
如果满足条件群聊功能开启则准备数据并触发绰号分析任务
Args:
anchor_message: 触发回复的原始消息对象
bot_reply: Bot 生成的回复内容列表
chat_stream: 可选的 ChatStream 对象
"""
# 检查功能是否开启
if not global_config.ENABLE_NICKNAME_MAPPING:
return
# 确定使用的 chat_stream
current_chat_stream = chat_stream or anchor_message.chat_stream
# 检查是否是群聊且 chat_stream 有效
if not current_chat_stream or not current_chat_stream.group_info:
logger.debug(f"[{current_chat_stream.stream_id if current_chat_stream else 'Unknown'}] Skipping nickname analysis: Not a group chat or invalid chat stream.")
return
log_prefix = f"[{current_chat_stream.stream_id}]" # 日志前缀
logger = get_logger("nickname_mapper")
llm_mapper: Optional[LLMRequest] = None
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
try:
# 1. 获取历史记录
history_limit = 30 # 可配置的历史记录条数
history_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=current_chat_stream.stream_id,
timestamp=time.time(),
limit=history_limit,
)
# 格式化历史记录
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
truncate=False,
)
# 2. 获取 Bot 回复字符串
bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表
# 3. 获取群号和平台
group_id = str(current_chat_stream.group_info.group_id)
platform = current_chat_stream.platform
# 4. 构建用户 ID 到名称的映射
user_ids_in_history = set()
for msg in history_messages:
sender_id = msg["user_info"].get("user_id")
if sender_id:
user_ids_in_history.add(str(sender_id))
user_name_map = {}
if user_ids_in_history:
try:
# 批量获取 person_name
names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
except Exception as e:
logger.error(f"{log_prefix} Error getting person names batch: {e}", exc_info=True)
names_data = {}
for user_id in user_ids_in_history:
if user_id in names_data:
user_name_map[user_id] = names_data[user_id]
else:
# 回退查找 nickname (从后往前找最新的)
latest_nickname = next(
(
m["user_info"].get("user_nickname") # 从 user_info 获取
for m in reversed(history_messages)
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") # 确保 nickname 存在
),
None,
)
user_name_map[user_id] = latest_nickname or f"未知({user_id})" # 提供回退
# 5. 添加到处理队列
await add_to_nickname_queue(chat_history_str, bot_reply_str, platform, group_id, user_name_map)
logger.debug(f"{log_prefix} Triggered nickname analysis for group {group_id}.")
# 从全局配置获取模型设置
model_config = global_config.llm_nickname_mapping
if not model_config or not model_config.get("name"):
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
else:
llm_mapper = LLMRequest( # <-- LLM 初始化
model=global_config.llm_nickname_mapping,
temperature=global_config.llm_nickname_mapping["temp"],
max_tokens=256,
request_type="nickname_mapping",
)
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
except Exception as e:
logger.error(f"{log_prefix} Error triggering nickname analysis: {e}", exc_info=True)
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
llm_mapper = None
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
"""构建用于 LLM 绰号映射的 Prompt"""
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
# print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print
prompt = f"""
任务分析以下聊天记录和你的最新回复判断其中是否包含用户绰号并确定绰号与用户 ID 之间是否存在明确的一一对应关系
已知用户信息ID: 名称
{user_list_str}
聊天记录
---
{chat_history_str}
---
你的最新回复
{bot_reply}
分析要求
1. 识别聊天记录和你发言中出现的可能是用户绰号的词语
2. 判断这些绰号是否能明确地指向某个特定的用户 ID一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来
3. 如果能建立可靠的一一映射关系请输出一个 JSON 对象格式如下
{{
"is_exist": true,
"data": {{
"用户A数字id": "绰号_A",
"用户B数字id": "绰号_B"
}}
}}
其中 "data" 字段的键是用户的 ID (字符串形式)值是对应的绰号只包含你能确认映射关系的绰号
4. 如果无法建立任何可靠的一一映射关系例如绰号指代不明没有出现绰号或无法确认绰号与用户的关联请输出 JSON 对象
{{
"is_exist": false
}}
5. 已知用户信息列表中你的昵称后面可能包含"(你)"这表示是你自己不需要输出你自身的绰号映射请确保不要将你自己的ID和任何词语映射为绰号
6. 请严格按照 JSON 格式输出不要包含任何额外的解释或文本
输出
"""
return prompt
async def analyze_chat_for_nicknames(
chat_history_str: str,
bot_reply: str,
user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息
) -> Dict[str, Any]:
"""
调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射并进行过滤
"""
if not global_config.ENABLE_NICKNAME_MAPPING:
logger.debug("绰号映射功能已禁用。")
return {"is_exist": False}
if llm_mapper is None:
logger.error("绰号映射 LLM 未初始化。无法执行分析。")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"构建的绰号映射 Prompt:\n{prompt}")
try:
# 调用 LLM
response_content, _, _ = await llm_mapper.generate_response(prompt)
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
if not response_content:
logger.warning("LLM 返回了空的绰号映射内容。")
return {"is_exist": False}
# 清理可能的 Markdown 代码块标记
response_content = response_content.strip()
if response_content.startswith("```json"):
response_content = response_content[7:]
if response_content.endswith("```"):
response_content = response_content[:-3]
response_content = response_content.strip()
try:
result = json.loads(response_content)
if isinstance(result, dict) and "is_exist" in result:
if result["is_exist"] is True:
original_data = result.get("data") # 使用 .get() 更安全
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
# --- 开始过滤 ---
filtered_data = {}
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
for user_id, nickname in original_data.items():
# 检查 user_id 是否是字符串,以防万一
if not isinstance(user_id, str):
logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。")
continue
# 条件 1: 排除机器人自身
if user_id == bot_qq_str:
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
continue
# 条件 2: 排除 nickname 与 person_name 相同的情况
person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
if person_name and person_name == nickname:
logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
continue
# 如果通过所有过滤条件,则保留
filtered_data[user_id] = nickname
# --- 结束过滤 ---
# 检查过滤后是否还有数据
if not filtered_data:
logger.info("所有找到的绰号映射都被过滤掉了。")
return {"is_exist": False}
else:
logger.info(f"过滤后的绰号映射: {filtered_data}")
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
else:
# is_exist 为 True 但 data 缺失、不是字典或为空
if "data" not in result:
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
elif not isinstance(result.get("data"), dict):
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。")
else: # data 为空字典
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
return {"is_exist": False}
elif result["is_exist"] is False:
logger.info("LLM 未找到可靠的绰号映射。")
return {"is_exist": False}
else:
logger.warning("LLM 响应格式错误: 'is_exist' 不是布尔值。")
return {"is_exist": False}
else:
logger.warning("LLM 响应格式错误: 缺少 'is_exist' 键或不是字典。")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
return {"is_exist": False}
except Exception as e:
logger.error(f"绰号映射 LLM 调用或处理过程中出错: {e}", exc_info=True)
return {"is_exist": False}