mirror of https://github.com/Mai-with-u/MaiBot.git
🤖 自动格式化代码 [skip ci]
parent
9794182545
commit
5345eb725a
|
|
@ -8,7 +8,7 @@ from src.config.config import global_config
|
|||
logger = get_logger("nickname_mapper")
|
||||
|
||||
llm_mapper: Optional[LLMRequest] = None
|
||||
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
||||
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
||||
try:
|
||||
# 从全局配置获取模型设置
|
||||
model_config = global_config.llm_nickname_mapping
|
||||
|
|
@ -16,10 +16,10 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
|||
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
|
||||
else:
|
||||
llm_mapper = LLMRequest( # <-- LLM 初始化
|
||||
model=global_config.llm_nickname_mapping,
|
||||
temperature=global_config.llm_nickname_mapping["temp"],
|
||||
max_tokens=256,
|
||||
request_type="nickname_mapping",
|
||||
model=global_config.llm_nickname_mapping,
|
||||
temperature=global_config.llm_nickname_mapping["temp"],
|
||||
max_tokens=256,
|
||||
request_type="nickname_mapping",
|
||||
)
|
||||
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
|
||||
|
||||
|
|
@ -27,6 +27,7 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
|||
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
|
||||
llm_mapper = None
|
||||
|
||||
|
||||
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
|
||||
"""构建用于 LLM 绰号映射的 Prompt"""
|
||||
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
|
||||
|
|
@ -74,7 +75,7 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
|
|||
async def analyze_chat_for_nicknames(
|
||||
chat_history_str: str,
|
||||
bot_reply: str,
|
||||
user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息
|
||||
user_name_map: Dict[str, str], # 这个 map 包含了 user_id -> person_name 的信息
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。
|
||||
|
|
@ -111,13 +112,13 @@ async def analyze_chat_for_nicknames(
|
|||
result = json.loads(response_content)
|
||||
if isinstance(result, dict) and "is_exist" in result:
|
||||
if result["is_exist"] is True:
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
|
||||
|
||||
# --- 开始过滤 ---
|
||||
filtered_data = {}
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
|
||||
for user_id, nickname in original_data.items():
|
||||
# 检查 user_id 是否是字符串,以防万一
|
||||
|
|
@ -147,7 +148,7 @@ async def analyze_chat_for_nicknames(
|
|||
return {"is_exist": False}
|
||||
else:
|
||||
logger.info(f"过滤后的绰号映射: {filtered_data}")
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
|
||||
else:
|
||||
# is_exist 为 True 但 data 缺失、不是字典或为空
|
||||
|
|
@ -155,7 +156,7 @@ async def analyze_chat_for_nicknames(
|
|||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
|
||||
elif not isinstance(result.get("data"), dict):
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。")
|
||||
else: # data 为空字典
|
||||
else: # data 为空字典
|
||||
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
|
||||
return {"is_exist": False}
|
||||
elif result["is_exist"] is False:
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
|
|||
# 如果输入为空,直接返回空列表
|
||||
return []
|
||||
|
||||
candidates = [] # 候选绰号列表,包含 (用户名, 绰号, 次数, 权重)
|
||||
candidates = [] # 候选绰号列表,包含 (用户名, 绰号, 次数, 权重)
|
||||
for user_name, nicknames in all_nicknames_info.items():
|
||||
if nicknames:
|
||||
for nickname_entry in nicknames:
|
||||
|
|
@ -45,9 +45,7 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
|
|||
candidates.append((user_name, nickname, count, weight))
|
||||
else:
|
||||
# 日志:记录无效的绰号次数
|
||||
logger.warning(
|
||||
f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。"
|
||||
)
|
||||
logger.warning(f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。")
|
||||
else:
|
||||
# 日志:记录无效的绰号条目格式
|
||||
logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。")
|
||||
|
|
@ -89,15 +87,13 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
|
|||
if len(selected) < num_to_select:
|
||||
logger.debug(f"加权随机选择后数量不足 ({len(selected)}/{num_to_select}),补充选择次数最多的。")
|
||||
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
|
||||
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
|
||||
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
|
||||
needed = num_to_select - len(selected)
|
||||
selected.extend(remaining_candidates[:needed])
|
||||
|
||||
except Exception as e:
|
||||
# 日志:记录加权随机选择时发生的错误,并回退到简单选择
|
||||
logger.error(
|
||||
f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True
|
||||
)
|
||||
logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True)
|
||||
# 出错时回退到选择次数最多的 N 个
|
||||
candidates.sort(key=lambda x: x[2], reverse=True)
|
||||
selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT]
|
||||
|
|
@ -125,8 +121,10 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in
|
|||
# 如果没有选中的绰号,返回空字符串
|
||||
return ""
|
||||
|
||||
prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:"] # 注入部分的标题
|
||||
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
|
||||
prompt_lines = [
|
||||
"以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:"
|
||||
] # 注入部分的标题
|
||||
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
|
||||
|
||||
# 按用户分组绰号
|
||||
for user_name, nickname, _count in selected_nicknames:
|
||||
|
|
@ -137,8 +135,8 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in
|
|||
|
||||
# 构建每个用户的绰号字符串
|
||||
for user_name, nicknames in grouped_by_user.items():
|
||||
nicknames_str = "、".join(nicknames) # 使用中文顿号连接
|
||||
prompt_lines.append(f"- 你私下称呼ta为{user_name},ta被有时被群友称为:{nicknames_str}") # 格式化输出
|
||||
nicknames_str = "、".join(nicknames) # 使用中文顿号连接
|
||||
prompt_lines.append(f"- 你私下称呼ta为{user_name},ta被有时被群友称为:{nicknames_str}") # 格式化输出
|
||||
|
||||
# 如果只有标题行,返回空字符串,避免注入无意义的标题
|
||||
if len(prompt_lines) > 1:
|
||||
|
|
@ -165,7 +163,7 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis
|
|||
if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info:
|
||||
try:
|
||||
group_id = str(chat_stream.group_info.group_id)
|
||||
user_ids_in_context = set() # 存储上下文中出现的用户ID
|
||||
user_ids_in_context = set() # 存储上下文中出现的用户ID
|
||||
|
||||
# 从消息列表中提取用户ID
|
||||
if message_list_before_now:
|
||||
|
|
@ -175,9 +173,9 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis
|
|||
user_ids_in_context.add(str(sender_id))
|
||||
else:
|
||||
# 如果消息列表为空,尝试获取最近发言者作为上下文用户
|
||||
recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者
|
||||
recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者
|
||||
for speaker in recent_speakers:
|
||||
user_ids_in_context.add(str(speaker['user_id']))
|
||||
user_ids_in_context.add(str(speaker["user_id"]))
|
||||
if not user_ids_in_context:
|
||||
# 日志:记录未找到上下文用户
|
||||
logger.warning(f"[{chat_stream.stream_id}] 未找到消息或最近发言者用于绰号注入。")
|
||||
|
|
@ -198,12 +196,14 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis
|
|||
nickname_injection_str = format_nickname_prompt_injection(selected_nicknames)
|
||||
if nickname_injection_str:
|
||||
# 日志:记录生成的用于 Prompt 的绰号信息
|
||||
logger.debug(f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}")
|
||||
logger.debug(
|
||||
f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# 日志:记录获取或格式化绰号信息时发生的错误
|
||||
logger.error(f"[{chat_stream.stream_id}] 获取或格式化 Prompt 绰号信息时出错: {e}", exc_info=True)
|
||||
nickname_injection_str = "" # 出错时确保返回空字符串
|
||||
nickname_injection_str = "" # 出错时确保返回空字符串
|
||||
|
||||
# 返回最终生成的字符串(可能为空)
|
||||
return nickname_injection_str
|
||||
|
|
@ -212,7 +212,7 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis
|
|||
async def trigger_nickname_analysis_if_needed(
|
||||
anchor_message: MessageRecv,
|
||||
bot_reply: List[str],
|
||||
chat_stream: Optional[ChatStream] = None # 允许传入 chat_stream 或从 anchor_message 获取
|
||||
chat_stream: Optional[ChatStream] = None, # 允许传入 chat_stream 或从 anchor_message 获取
|
||||
):
|
||||
"""
|
||||
如果满足条件(群聊、功能开启),则准备数据并触发绰号分析任务。
|
||||
|
|
@ -225,7 +225,7 @@ async def trigger_nickname_analysis_if_needed(
|
|||
"""
|
||||
# 检查功能是否开启
|
||||
if not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
return # 如果功能禁用,直接返回
|
||||
return # 如果功能禁用,直接返回
|
||||
|
||||
# 确定使用的 chat_stream
|
||||
current_chat_stream = chat_stream or anchor_message.chat_stream
|
||||
|
|
@ -233,45 +233,47 @@ async def trigger_nickname_analysis_if_needed(
|
|||
# 检查是否是群聊且 chat_stream 有效
|
||||
if not current_chat_stream or not current_chat_stream.group_info:
|
||||
# 日志:记录跳过分析的原因(非群聊或无效流)
|
||||
logger.debug(f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。")
|
||||
logger.debug(
|
||||
f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。"
|
||||
)
|
||||
return
|
||||
|
||||
log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀
|
||||
log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀
|
||||
|
||||
try:
|
||||
# 1. 获取历史记录
|
||||
history_limit = 30 # 定义获取历史记录的数量限制
|
||||
history_messages = get_raw_msg_before_timestamp_with_chat(
|
||||
chat_id=current_chat_stream.stream_id,
|
||||
timestamp=time.time(), # 获取当前时间之前的记录
|
||||
timestamp=time.time(), # 获取当前时间之前的记录
|
||||
limit=history_limit,
|
||||
)
|
||||
|
||||
# 格式化历史记录为可读字符串
|
||||
chat_history_str = await build_readable_messages(
|
||||
messages=history_messages,
|
||||
replace_bot_name=True, # 替换机器人名称,以便 LLM 分析
|
||||
merge_messages=False, # 不合并消息,保留原始对话结构
|
||||
timestamp_mode="relative", # 使用相对时间戳
|
||||
read_mark=0.0, # 不需要已读标记
|
||||
truncate=False, # 获取完整内容进行分析
|
||||
replace_bot_name=True, # 替换机器人名称,以便 LLM 分析
|
||||
merge_messages=False, # 不合并消息,保留原始对话结构
|
||||
timestamp_mode="relative", # 使用相对时间戳
|
||||
read_mark=0.0, # 不需要已读标记
|
||||
truncate=False, # 获取完整内容进行分析
|
||||
)
|
||||
|
||||
# 2. 获取 Bot 回复字符串
|
||||
bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表
|
||||
bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表
|
||||
|
||||
# 3. 获取群号和平台信息
|
||||
group_id = str(current_chat_stream.group_info.group_id)
|
||||
platform = current_chat_stream.platform
|
||||
|
||||
# 4. 构建用户 ID 到名称的映射 (user_name_map)
|
||||
user_ids_in_history = set() # 存储历史记录中出现的用户ID
|
||||
user_ids_in_history = set() # 存储历史记录中出现的用户ID
|
||||
for msg in history_messages:
|
||||
sender_id = msg["user_info"].get("user_id")
|
||||
if sender_id:
|
||||
user_ids_in_history.add(str(sender_id))
|
||||
|
||||
user_name_map = {} # 初始化映射字典
|
||||
user_name_map = {} # 初始化映射字典
|
||||
if user_ids_in_history:
|
||||
try:
|
||||
# 批量从数据库获取这些用户的 person_name
|
||||
|
|
@ -279,7 +281,7 @@ async def trigger_nickname_analysis_if_needed(
|
|||
except Exception as e:
|
||||
# 日志:记录获取 person_name 时发生的错误
|
||||
logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True)
|
||||
names_data = {} # 出错时使用空字典
|
||||
names_data = {} # 出错时使用空字典
|
||||
|
||||
# 填充 user_name_map
|
||||
for user_id in user_ids_in_history:
|
||||
|
|
@ -290,12 +292,12 @@ async def trigger_nickname_analysis_if_needed(
|
|||
# 如果数据库中没有,则回退查找用户在历史记录中最近使用的 nickname
|
||||
latest_nickname = next(
|
||||
(
|
||||
m["user_info"].get("user_nickname") # 从 user_info 获取 nickname
|
||||
for m in reversed(history_messages) # 从后往前找
|
||||
m["user_info"].get("user_nickname") # 从 user_info 获取 nickname
|
||||
for m in reversed(history_messages) # 从后往前找
|
||||
# 确保消息的用户ID匹配且 nickname 存在
|
||||
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")
|
||||
),
|
||||
None, # 如果找不到,返回 None
|
||||
None, # 如果找不到,返回 None
|
||||
)
|
||||
# 如果找到了 nickname 则使用,否则使用 "未知(ID)"
|
||||
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
|
||||
|
|
@ -307,4 +309,4 @@ async def trigger_nickname_analysis_if_needed(
|
|||
|
||||
except Exception as e:
|
||||
# 日志:记录触发分析过程中发生的任何其他错误
|
||||
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
|
||||
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
|
||||
|
|
|
|||
|
|
@ -274,7 +274,9 @@ class PromptBuilder:
|
|||
)
|
||||
return None
|
||||
|
||||
async def _build_prompt_normal(self, chat_stream, message_txt: str, sender_name: str = "某人") -> str: # 返回值改为 str
|
||||
async def _build_prompt_normal(
|
||||
self, chat_stream, message_txt: str, sender_name: str = "某人"
|
||||
) -> str: # 返回值改为 str
|
||||
individuality = Individuality.get_instance()
|
||||
prompt_personality = individuality.get_prompt(x_person=2, level=2)
|
||||
|
||||
|
|
@ -415,7 +417,7 @@ class PromptBuilder:
|
|||
memory_prompt=memory_prompt,
|
||||
prompt_info=prompt_info,
|
||||
schedule_prompt=schedule_prompt,
|
||||
nickname_info=nickname_injection_str, # <--- 注入绰号信息
|
||||
nickname_info=nickname_injection_str, # <--- 注入绰号信息
|
||||
chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1")
|
||||
if chat_in_group
|
||||
else await global_prompt_manager.get_prompt_async("chat_target_private1"),
|
||||
|
|
|
|||
Loading…
Reference in New Issue