mirror of https://github.com/Mai-with-u/MaiBot.git
prompt优化,适配可乐老大新工具
parent
c7cf0b102e
commit
6b6945fd77
|
|
@ -2,15 +2,13 @@ import json
|
|||
from typing import Dict, Any, Optional
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.plugins.models.utils_model import LLMRequest
|
||||
|
||||
# 从全局配置导入
|
||||
from src.config.config import global_config
|
||||
|
||||
|
||||
logger = get_logger("nickname_mapper")
|
||||
|
||||
llm_mapper: Optional[LLMRequest] = None
|
||||
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
||||
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
||||
try:
|
||||
# 从全局配置获取模型设置
|
||||
model_config = global_config.llm_nickname_mapping
|
||||
|
|
@ -18,10 +16,10 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
|||
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
|
||||
else:
|
||||
llm_mapper = LLMRequest( # <-- LLM 初始化
|
||||
model=global_config.llm_nickname_mapping,
|
||||
temperature=global_config.llm_nickname_mapping["temp"],
|
||||
max_tokens=256,
|
||||
request_type="nickname_mapping",
|
||||
model=global_config.llm_nickname_mapping,
|
||||
temperature=global_config.llm_nickname_mapping["temp"],
|
||||
max_tokens=256,
|
||||
request_type="nickname_mapping",
|
||||
)
|
||||
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
|
||||
|
||||
|
|
@ -29,7 +27,6 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
|||
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
|
||||
llm_mapper = None
|
||||
|
||||
|
||||
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
|
||||
"""构建用于 LLM 绰号映射的 Prompt"""
|
||||
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
|
||||
|
|
@ -66,7 +63,8 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
|
|||
"is_exist": false
|
||||
}}
|
||||
5. 在“已知用户信息”列表中,你的昵称后面可能包含"(你)",这表示是你自己,不需要输出你自身的绰号映射。请确保不要将你自己的ID和任何词语映射为绰号。
|
||||
6. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。
|
||||
6. 不要输出与用户名称相同的绰号。
|
||||
7. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。
|
||||
|
||||
输出:
|
||||
"""
|
||||
|
|
@ -76,7 +74,7 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
|
|||
async def analyze_chat_for_nicknames(
|
||||
chat_history_str: str,
|
||||
bot_reply: str,
|
||||
user_name_map: Dict[str, str], # 这个 map 包含了 user_id -> person_name 的信息
|
||||
user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。
|
||||
|
|
@ -113,13 +111,13 @@ async def analyze_chat_for_nicknames(
|
|||
result = json.loads(response_content)
|
||||
if isinstance(result, dict) and "is_exist" in result:
|
||||
if result["is_exist"] is True:
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
|
||||
|
||||
# --- 开始过滤 ---
|
||||
filtered_data = {}
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
|
||||
for user_id, nickname in original_data.items():
|
||||
# 检查 user_id 是否是字符串,以防万一
|
||||
|
|
@ -132,13 +130,12 @@ async def analyze_chat_for_nicknames(
|
|||
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
|
||||
continue
|
||||
|
||||
# 条件 2: 排除 nickname 与 person_name 相同的情况
|
||||
person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
|
||||
if person_name and person_name == nickname:
|
||||
logger.debug(
|
||||
f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。"
|
||||
)
|
||||
continue
|
||||
# 有了改名工具后,该过滤器已不适合了,尝试通过修改 prompt 获得更好的结果
|
||||
# # 条件 2: 排除 nickname 与 person_name 相同的情况
|
||||
# person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
|
||||
# if person_name and person_name == nickname:
|
||||
# logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
|
||||
# continue
|
||||
|
||||
# 如果通过所有过滤条件,则保留
|
||||
filtered_data[user_id] = nickname
|
||||
|
|
@ -150,7 +147,7 @@ async def analyze_chat_for_nicknames(
|
|||
return {"is_exist": False}
|
||||
else:
|
||||
logger.info(f"过滤后的绰号映射: {filtered_data}")
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
|
||||
else:
|
||||
# is_exist 为 True 但 data 缺失、不是字典或为空
|
||||
|
|
@ -158,7 +155,7 @@ async def analyze_chat_for_nicknames(
|
|||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
|
||||
elif not isinstance(result.get("data"), dict):
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。")
|
||||
else: # data 为空字典
|
||||
else: # data 为空字典
|
||||
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
|
||||
return {"is_exist": False}
|
||||
elif result["is_exist"] is False:
|
||||
|
|
|
|||
|
|
@ -1,174 +1,310 @@
|
|||
import json
|
||||
from typing import Dict, Any, Optional
|
||||
# GroupNickname/nickname_utils.py
|
||||
import random
|
||||
import time
|
||||
from typing import List, Dict, Tuple, Optional, Any
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.plugins.models.utils_model import LLMRequest
|
||||
from src.config.config import global_config
|
||||
from src.plugins.person_info.relationship_manager import relationship_manager
|
||||
from src.plugins.chat.chat_stream import ChatStream
|
||||
from src.plugins.chat.message import MessageRecv
|
||||
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
|
||||
from .nickname_processor import add_to_nickname_queue
|
||||
|
||||
|
||||
logger = get_logger("nickname_mapper")
|
||||
|
||||
llm_mapper: Optional[LLMRequest] = None
|
||||
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
||||
try:
|
||||
# 从全局配置获取模型设置
|
||||
model_config = global_config.llm_nickname_mapping
|
||||
if not model_config or not model_config.get("name"):
|
||||
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
|
||||
else:
|
||||
llm_mapper = LLMRequest( # <-- LLM 初始化
|
||||
model=global_config.llm_nickname_mapping,
|
||||
temperature=global_config.llm_nickname_mapping["temp"],
|
||||
max_tokens=256,
|
||||
request_type="nickname_mapping",
|
||||
)
|
||||
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
|
||||
llm_mapper = None
|
||||
|
||||
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
|
||||
"""构建用于 LLM 绰号映射的 Prompt"""
|
||||
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
|
||||
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
|
||||
# print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print
|
||||
prompt = f"""
|
||||
任务:分析以下聊天记录和你的最新回复,判断其中是否包含用户绰号,并确定绰号与用户 ID 之间是否存在明确的一一对应关系。
|
||||
|
||||
已知用户信息(ID: 名称):
|
||||
{user_list_str}
|
||||
|
||||
聊天记录:
|
||||
---
|
||||
{chat_history_str}
|
||||
---
|
||||
|
||||
你的最新回复:
|
||||
{bot_reply}
|
||||
|
||||
分析要求:
|
||||
1. 识别聊天记录和你发言中出现的可能是用户绰号的词语。
|
||||
2. 判断这些绰号是否能明确地指向某个特定的用户 ID。一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来。
|
||||
3. 如果能建立可靠的一一映射关系,请输出一个 JSON 对象,格式如下:
|
||||
{{
|
||||
"is_exist": true,
|
||||
"data": {{
|
||||
"用户A数字id": "绰号_A",
|
||||
"用户B数字id": "绰号_B"
|
||||
}}
|
||||
}}
|
||||
其中 "data" 字段的键是用户的 ID (字符串形式),值是对应的绰号。只包含你能确认映射关系的绰号。
|
||||
4. 如果无法建立任何可靠的一一映射关系(例如,绰号指代不明、没有出现绰号、或无法确认绰号与用户的关联),请输出 JSON 对象:
|
||||
{{
|
||||
"is_exist": false
|
||||
}}
|
||||
5. 在“已知用户信息”列表中,你的昵称后面可能包含"(你)",这表示是你自己,不需要输出你自身的绰号映射。请确保不要将你自己的ID和任何词语映射为绰号。
|
||||
6. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。
|
||||
|
||||
输出:
|
||||
"""
|
||||
return prompt
|
||||
# 获取日志记录器,命名为 "绰号工具"
|
||||
logger = get_logger("nickname_utils")
|
||||
|
||||
|
||||
async def analyze_chat_for_nicknames(
|
||||
chat_history_str: str,
|
||||
bot_reply: str,
|
||||
user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息
|
||||
) -> Dict[str, Any]:
|
||||
def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]:
|
||||
"""
|
||||
调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。
|
||||
从给定的绰号信息中,根据映射次数加权随机选择最多 N 个绰号。
|
||||
|
||||
Args:
|
||||
all_nicknames_info: 包含用户及其绰号信息的字典,格式为
|
||||
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, str, int]]: 选中的绰号列表,每个元素为 (用户名, 绰号, 次数)。
|
||||
按次数降序排序。
|
||||
"""
|
||||
if not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
logger.debug("绰号映射功能已禁用。")
|
||||
return {"is_exist": False}
|
||||
|
||||
if llm_mapper is None:
|
||||
logger.error("绰号映射 LLM 未初始化。无法执行分析。")
|
||||
return {"is_exist": False}
|
||||
|
||||
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
|
||||
logger.debug(f"构建的绰号映射 Prompt:\n{prompt}")
|
||||
|
||||
try:
|
||||
# 调用 LLM
|
||||
response_content, _, _ = await llm_mapper.generate_response(prompt)
|
||||
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
|
||||
|
||||
if not response_content:
|
||||
logger.warning("LLM 返回了空的绰号映射内容。")
|
||||
return {"is_exist": False}
|
||||
|
||||
# 清理可能的 Markdown 代码块标记
|
||||
response_content = response_content.strip()
|
||||
if response_content.startswith("```json"):
|
||||
response_content = response_content[7:]
|
||||
if response_content.endswith("```"):
|
||||
response_content = response_content[:-3]
|
||||
response_content = response_content.strip()
|
||||
|
||||
try:
|
||||
result = json.loads(response_content)
|
||||
if isinstance(result, dict) and "is_exist" in result:
|
||||
if result["is_exist"] is True:
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
|
||||
|
||||
# --- 开始过滤 ---
|
||||
filtered_data = {}
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
|
||||
for user_id, nickname in original_data.items():
|
||||
# 检查 user_id 是否是字符串,以防万一
|
||||
if not isinstance(user_id, str):
|
||||
logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。")
|
||||
continue
|
||||
|
||||
# 条件 1: 排除机器人自身
|
||||
if user_id == bot_qq_str:
|
||||
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
|
||||
continue
|
||||
|
||||
# 条件 2: 排除 nickname 与 person_name 相同的情况
|
||||
person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
|
||||
if person_name and person_name == nickname:
|
||||
logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
|
||||
continue
|
||||
|
||||
# 如果通过所有过滤条件,则保留
|
||||
filtered_data[user_id] = nickname
|
||||
# --- 结束过滤 ---
|
||||
|
||||
# 检查过滤后是否还有数据
|
||||
if not filtered_data:
|
||||
logger.info("所有找到的绰号映射都被过滤掉了。")
|
||||
return {"is_exist": False}
|
||||
else:
|
||||
logger.info(f"过滤后的绰号映射: {filtered_data}")
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
if not all_nicknames_info:
|
||||
# 如果输入为空,直接返回空列表
|
||||
return []
|
||||
|
||||
candidates = [] # 候选绰号列表,包含 (用户名, 绰号, 次数, 权重)
|
||||
for user_name, nicknames in all_nicknames_info.items():
|
||||
if nicknames:
|
||||
for nickname_entry in nicknames:
|
||||
# nickname_entry 应该是 {"绰号": 次数} 格式
|
||||
if isinstance(nickname_entry, dict) and len(nickname_entry) == 1:
|
||||
nickname, count = list(nickname_entry.items())[0]
|
||||
# 确保次数是正整数
|
||||
if isinstance(count, int) and count > 0:
|
||||
# 添加平滑因子,避免概率为0,并让低频词也有机会
|
||||
weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING
|
||||
candidates.append((user_name, nickname, count, weight))
|
||||
else:
|
||||
# is_exist 为 True 但 data 缺失、不是字典或为空
|
||||
if "data" not in result:
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
|
||||
elif not isinstance(result.get("data"), dict):
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。")
|
||||
else: # data 为空字典
|
||||
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
|
||||
return {"is_exist": False}
|
||||
elif result["is_exist"] is False:
|
||||
logger.info("LLM 未找到可靠的绰号映射。")
|
||||
return {"is_exist": False}
|
||||
# 日志:记录无效的绰号次数
|
||||
logger.warning(
|
||||
f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。"
|
||||
)
|
||||
else:
|
||||
logger.warning("LLM 响应格式错误: 'is_exist' 不是布尔值。")
|
||||
return {"is_exist": False}
|
||||
# 日志:记录无效的绰号条目格式
|
||||
logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。")
|
||||
|
||||
if not candidates:
|
||||
# 如果没有有效的候选绰号,返回空列表
|
||||
return []
|
||||
|
||||
# 计算总权重
|
||||
total_weight = sum(c[3] for c in candidates)
|
||||
|
||||
if total_weight <= 0:
|
||||
# 如果所有权重都无效或为0,则按原始次数排序选择前 N 个
|
||||
logger.warning("所有候选绰号的总权重为0或负数,将按原始次数选择 Top N。")
|
||||
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
|
||||
selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT]
|
||||
else:
|
||||
# 计算归一化概率
|
||||
probabilities = [c[3] / total_weight for c in candidates]
|
||||
|
||||
# 使用概率分布进行加权随机选择(不重复)
|
||||
num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates))
|
||||
try:
|
||||
# 实现不重复加权抽样
|
||||
selected_indices = set()
|
||||
selected = []
|
||||
attempts = 0
|
||||
max_attempts = num_to_select * 5 # 设置最大尝试次数,防止无限循环
|
||||
|
||||
while len(selected) < num_to_select and attempts < max_attempts:
|
||||
# 每次只选一个
|
||||
chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0]
|
||||
if chosen_index not in selected_indices:
|
||||
selected_indices.add(chosen_index)
|
||||
selected.append(candidates[chosen_index])
|
||||
attempts += 1
|
||||
|
||||
# 如果尝试多次后仍未选够,补充出现次数最多的
|
||||
if len(selected) < num_to_select:
|
||||
logger.debug(f"加权随机选择后数量不足 ({len(selected)}/{num_to_select}),补充选择次数最多的。")
|
||||
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
|
||||
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
|
||||
needed = num_to_select - len(selected)
|
||||
selected.extend(remaining_candidates[:needed])
|
||||
|
||||
except Exception as e:
|
||||
# 日志:记录加权随机选择时发生的错误,并回退到简单选择
|
||||
logger.error(
|
||||
f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True
|
||||
)
|
||||
# 出错时回退到选择次数最多的 N 个
|
||||
candidates.sort(key=lambda x: x[2], reverse=True)
|
||||
selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT]
|
||||
|
||||
# 格式化输出结果为 (用户名, 绰号, 次数)
|
||||
result = [(user, nick, count) for user, nick, count, _weight in selected]
|
||||
result.sort(key=lambda x: x[2], reverse=True) # 按次数降序
|
||||
|
||||
# 日志:记录最终选中的用于 Prompt 的绰号
|
||||
logger.debug(f"为 Prompt 选择的绰号: {result}")
|
||||
return result
|
||||
|
||||
|
||||
def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str:
|
||||
"""
|
||||
将选中的绰号信息格式化为注入 Prompt 的字符串。
|
||||
|
||||
Args:
|
||||
selected_nicknames: 选中的绰号列表 (用户名, 绰号, 次数)。
|
||||
|
||||
Returns:
|
||||
str: 格式化后的字符串,如果列表为空则返回空字符串。
|
||||
"""
|
||||
if not selected_nicknames:
|
||||
# 如果没有选中的绰号,返回空字符串
|
||||
return ""
|
||||
|
||||
prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:"] # 注入部分的标题
|
||||
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
|
||||
|
||||
# 按用户分组绰号
|
||||
for user_name, nickname, _count in selected_nicknames:
|
||||
if user_name not in grouped_by_user:
|
||||
grouped_by_user[user_name] = []
|
||||
# 添加中文引号以区分绰号
|
||||
grouped_by_user[user_name].append(f"“{nickname}”")
|
||||
|
||||
# 构建每个用户的绰号字符串
|
||||
for user_name, nicknames in grouped_by_user.items():
|
||||
nicknames_str = "、".join(nicknames) # 使用中文顿号连接
|
||||
prompt_lines.append(f"- 你私下称呼ta为{user_name},ta被有时被群友称为:{nicknames_str}") # 格式化输出
|
||||
|
||||
# 如果只有标题行,返回空字符串,避免注入无意义的标题
|
||||
if len(prompt_lines) > 1:
|
||||
# 末尾加换行符,以便在 Prompt 中正确分隔
|
||||
return "\n".join(prompt_lines) + "\n"
|
||||
else:
|
||||
return ""
|
||||
|
||||
|
||||
async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str:
|
||||
"""
|
||||
获取并格式化用于 Prompt 注入的绰号信息字符串。
|
||||
这是一个封装函数,整合了获取、选择和格式化的逻辑。
|
||||
|
||||
Args:
|
||||
chat_stream: 当前的 ChatStream 对象。
|
||||
message_list_before_now: 用于确定上下文中用户的消息列表。
|
||||
|
||||
Returns:
|
||||
str: 格式化后的绰号信息字符串,如果无法获取或格式化则返回空字符串。
|
||||
"""
|
||||
nickname_injection_str = ""
|
||||
# 仅在群聊且功能开启时执行
|
||||
if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info:
|
||||
try:
|
||||
group_id = str(chat_stream.group_info.group_id)
|
||||
user_ids_in_context = set() # 存储上下文中出现的用户ID
|
||||
|
||||
# 从消息列表中提取用户ID
|
||||
if message_list_before_now:
|
||||
for msg in message_list_before_now:
|
||||
sender_id = msg["user_info"].get("user_id")
|
||||
if sender_id:
|
||||
user_ids_in_context.add(str(sender_id))
|
||||
else:
|
||||
logger.warning("LLM 响应格式错误: 缺少 'is_exist' 键或不是字典。")
|
||||
return {"is_exist": False}
|
||||
except json.JSONDecodeError as json_err:
|
||||
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
|
||||
return {"is_exist": False}
|
||||
# 如果消息列表为空,尝试获取最近发言者作为上下文用户
|
||||
recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者
|
||||
for speaker in recent_speakers:
|
||||
user_ids_in_context.add(str(speaker['user_id']))
|
||||
if not user_ids_in_context:
|
||||
# 日志:记录未找到上下文用户
|
||||
logger.warning(f"[{chat_stream.stream_id}] 未找到消息或最近发言者用于绰号注入。")
|
||||
|
||||
# 如果找到了上下文用户
|
||||
if user_ids_in_context:
|
||||
platform = chat_stream.platform
|
||||
# --- 调用批量获取群组绰号的方法 ---
|
||||
# 使用 relationship_manager 从数据库获取数据
|
||||
all_nicknames_data = await relationship_manager.get_users_group_nicknames(
|
||||
platform, list(user_ids_in_context), group_id
|
||||
)
|
||||
|
||||
# 如果获取到了绰号数据
|
||||
if all_nicknames_data:
|
||||
# 调用选择和格式化函数
|
||||
selected_nicknames = select_nicknames_for_prompt(all_nicknames_data)
|
||||
nickname_injection_str = format_nickname_prompt_injection(selected_nicknames)
|
||||
if nickname_injection_str:
|
||||
# 日志:记录生成的用于 Prompt 的绰号信息
|
||||
logger.debug(f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}")
|
||||
|
||||
except Exception as e:
|
||||
# 日志:记录获取或格式化绰号信息时发生的错误
|
||||
logger.error(f"[{chat_stream.stream_id}] 获取或格式化 Prompt 绰号信息时出错: {e}", exc_info=True)
|
||||
nickname_injection_str = "" # 出错时确保返回空字符串
|
||||
|
||||
# 返回最终生成的字符串(可能为空)
|
||||
return nickname_injection_str
|
||||
|
||||
|
||||
async def trigger_nickname_analysis_if_needed(
|
||||
anchor_message: MessageRecv,
|
||||
bot_reply: List[str],
|
||||
chat_stream: Optional[ChatStream] = None # 允许传入 chat_stream 或从 anchor_message 获取
|
||||
):
|
||||
"""
|
||||
如果满足条件(群聊、功能开启),则准备数据并触发绰号分析任务。
|
||||
将相关信息放入处理队列,由 nickname_processor 处理。
|
||||
|
||||
Args:
|
||||
anchor_message: 触发回复的原始消息对象。
|
||||
bot_reply: Bot 生成的回复内容列表。
|
||||
chat_stream: 可选的 ChatStream 对象。
|
||||
"""
|
||||
# 检查功能是否开启
|
||||
if not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
return # 如果功能禁用,直接返回
|
||||
|
||||
# 确定使用的 chat_stream
|
||||
current_chat_stream = chat_stream or anchor_message.chat_stream
|
||||
|
||||
# 检查是否是群聊且 chat_stream 有效
|
||||
if not current_chat_stream or not current_chat_stream.group_info:
|
||||
# 日志:记录跳过分析的原因(非群聊或无效流)
|
||||
logger.debug(f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。")
|
||||
return
|
||||
|
||||
log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀
|
||||
|
||||
try:
|
||||
# 1. 获取历史记录
|
||||
history_limit = 30 # 定义获取历史记录的数量限制
|
||||
history_messages = get_raw_msg_before_timestamp_with_chat(
|
||||
chat_id=current_chat_stream.stream_id,
|
||||
timestamp=time.time(), # 获取当前时间之前的记录
|
||||
limit=history_limit,
|
||||
)
|
||||
|
||||
# 格式化历史记录为可读字符串
|
||||
chat_history_str = await build_readable_messages(
|
||||
messages=history_messages,
|
||||
replace_bot_name=True, # 替换机器人名称,以便 LLM 分析
|
||||
merge_messages=False, # 不合并消息,保留原始对话结构
|
||||
timestamp_mode="relative", # 使用相对时间戳
|
||||
read_mark=0.0, # 不需要已读标记
|
||||
truncate=False, # 获取完整内容进行分析
|
||||
)
|
||||
|
||||
# 2. 获取 Bot 回复字符串
|
||||
bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表
|
||||
|
||||
# 3. 获取群号和平台信息
|
||||
group_id = str(current_chat_stream.group_info.group_id)
|
||||
platform = current_chat_stream.platform
|
||||
|
||||
# 4. 构建用户 ID 到名称的映射 (user_name_map)
|
||||
user_ids_in_history = set() # 存储历史记录中出现的用户ID
|
||||
for msg in history_messages:
|
||||
sender_id = msg["user_info"].get("user_id")
|
||||
if sender_id:
|
||||
user_ids_in_history.add(str(sender_id))
|
||||
|
||||
user_name_map = {} # 初始化映射字典
|
||||
if user_ids_in_history:
|
||||
try:
|
||||
# 批量从数据库获取这些用户的 person_name
|
||||
names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
|
||||
except Exception as e:
|
||||
# 日志:记录获取 person_name 时发生的错误
|
||||
logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True)
|
||||
names_data = {} # 出错时使用空字典
|
||||
|
||||
# 填充 user_name_map
|
||||
for user_id in user_ids_in_history:
|
||||
if user_id in names_data:
|
||||
# 如果数据库中有 person_name,则使用它
|
||||
user_name_map[user_id] = names_data[user_id]
|
||||
else:
|
||||
# 如果数据库中没有,则回退查找用户在历史记录中最近使用的 nickname
|
||||
latest_nickname = next(
|
||||
(
|
||||
m["user_info"].get("user_nickname") # 从 user_info 获取 nickname
|
||||
for m in reversed(history_messages) # 从后往前找
|
||||
# 确保消息的用户ID匹配且 nickname 存在
|
||||
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")
|
||||
),
|
||||
None, # 如果找不到,返回 None
|
||||
)
|
||||
# 如果找到了 nickname 则使用,否则使用 "未知(ID)"
|
||||
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
|
||||
|
||||
# 5. 将准备好的数据添加到绰号处理队列
|
||||
await add_to_nickname_queue(chat_history_str, bot_reply_str, platform, group_id, user_name_map)
|
||||
# 日志:记录已成功触发分析任务
|
||||
logger.debug(f"{log_prefix} 已为群组 {group_id} 触发绰号分析任务。")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"绰号映射 LLM 调用或处理过程中出错: {e}", exc_info=True)
|
||||
return {"is_exist": False}
|
||||
# 日志:记录触发分析过程中发生的任何其他错误
|
||||
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
|
||||
Loading…
Reference in New Issue