From 5345eb725ae714ffad958e70d566b7560115904f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 1 May 2025 17:08:00 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/group_nickname/nickname_mapper.py | 23 +++--- src/plugins/group_nickname/nickname_utils.py | 72 ++++++++++--------- .../heartFC_chat/heartflow_prompt_builder.py | 6 +- 3 files changed, 53 insertions(+), 48 deletions(-) diff --git a/src/plugins/group_nickname/nickname_mapper.py b/src/plugins/group_nickname/nickname_mapper.py index 1af61ba8..0723f8ee 100644 --- a/src/plugins/group_nickname/nickname_mapper.py +++ b/src/plugins/group_nickname/nickname_mapper.py @@ -8,7 +8,7 @@ from src.config.config import global_config logger = get_logger("nickname_mapper") llm_mapper: Optional[LLMRequest] = None -if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 +if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 try: # 从全局配置获取模型设置 model_config = global_config.llm_nickname_mapping @@ -16,10 +16,10 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。") else: llm_mapper = LLMRequest( # <-- LLM 初始化 - model=global_config.llm_nickname_mapping, - temperature=global_config.llm_nickname_mapping["temp"], - max_tokens=256, - request_type="nickname_mapping", + model=global_config.llm_nickname_mapping, + temperature=global_config.llm_nickname_mapping["temp"], + max_tokens=256, + request_type="nickname_mapping", ) logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。") @@ -27,6 +27,7 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关 logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True) llm_mapper = None + def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str: """构建用于 LLM 绰号映射的 Prompt""" # user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射 @@ -74,7 +75,7 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: async def analyze_chat_for_nicknames( chat_history_str: str, bot_reply: str, - user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息 + user_name_map: Dict[str, str], # 这个 map 包含了 user_id -> person_name 的信息 ) -> Dict[str, Any]: """ 调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。 @@ -111,13 +112,13 @@ async def analyze_chat_for_nicknames( result = json.loads(response_content) if isinstance(result, dict) and "is_exist" in result: if result["is_exist"] is True: - original_data = result.get("data") # 使用 .get() 更安全 - if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典 + original_data = result.get("data") # 使用 .get() 更安全 + if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典 logger.info(f"LLM 找到的原始绰号映射: {original_data}") # --- 开始过滤 --- filtered_data = {} - bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较 + bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较 for user_id, nickname in original_data.items(): # 检查 user_id 是否是字符串,以防万一 @@ -147,7 +148,7 @@ async def analyze_chat_for_nicknames( return {"is_exist": False} else: logger.info(f"过滤后的绰号映射: {filtered_data}") - return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据 + return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据 else: # is_exist 为 True 但 data 缺失、不是字典或为空 @@ -155,7 +156,7 @@ async def analyze_chat_for_nicknames( logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。") elif not isinstance(result.get("data"), dict): logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。") - else: # data 为空字典 + else: # data 为空字典 logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。") return {"is_exist": False} elif result["is_exist"] is False: diff --git a/src/plugins/group_nickname/nickname_utils.py b/src/plugins/group_nickname/nickname_utils.py index 3a784e70..bbd124a7 100644 --- a/src/plugins/group_nickname/nickname_utils.py +++ b/src/plugins/group_nickname/nickname_utils.py @@ -31,7 +31,7 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int # 如果输入为空,直接返回空列表 return [] - candidates = [] # 候选绰号列表,包含 (用户名, 绰号, 次数, 权重) + candidates = [] # 候选绰号列表,包含 (用户名, 绰号, 次数, 权重) for user_name, nicknames in all_nicknames_info.items(): if nicknames: for nickname_entry in nicknames: @@ -45,9 +45,7 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int candidates.append((user_name, nickname, count, weight)) else: # 日志:记录无效的绰号次数 - logger.warning( - f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。" - ) + logger.warning(f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。") else: # 日志:记录无效的绰号条目格式 logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。") @@ -89,15 +87,13 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int if len(selected) < num_to_select: logger.debug(f"加权随机选择后数量不足 ({len(selected)}/{num_to_select}),补充选择次数最多的。") remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices] - remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 needed = num_to_select - len(selected) selected.extend(remaining_candidates[:needed]) except Exception as e: # 日志:记录加权随机选择时发生的错误,并回退到简单选择 - logger.error( - f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True - ) + logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True) # 出错时回退到选择次数最多的 N 个 candidates.sort(key=lambda x: x[2], reverse=True) selected = candidates[: global_config.MAX_NICKNAMES_IN_PROMPT] @@ -125,8 +121,10 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in # 如果没有选中的绰号,返回空字符串 return "" - prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:"] # 注入部分的标题 - grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组 + prompt_lines = [ + "以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及:" + ] # 注入部分的标题 + grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组 # 按用户分组绰号 for user_name, nickname, _count in selected_nicknames: @@ -137,8 +135,8 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in # 构建每个用户的绰号字符串 for user_name, nicknames in grouped_by_user.items(): - nicknames_str = "、".join(nicknames) # 使用中文顿号连接 - prompt_lines.append(f"- 你私下称呼ta为{user_name},ta被有时被群友称为:{nicknames_str}") # 格式化输出 + nicknames_str = "、".join(nicknames) # 使用中文顿号连接 + prompt_lines.append(f"- 你私下称呼ta为{user_name},ta被有时被群友称为:{nicknames_str}") # 格式化输出 # 如果只有标题行,返回空字符串,避免注入无意义的标题 if len(prompt_lines) > 1: @@ -165,7 +163,7 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info: try: group_id = str(chat_stream.group_info.group_id) - user_ids_in_context = set() # 存储上下文中出现的用户ID + user_ids_in_context = set() # 存储上下文中出现的用户ID # 从消息列表中提取用户ID if message_list_before_now: @@ -175,9 +173,9 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis user_ids_in_context.add(str(sender_id)) else: # 如果消息列表为空,尝试获取最近发言者作为上下文用户 - recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者 + recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者 for speaker in recent_speakers: - user_ids_in_context.add(str(speaker['user_id'])) + user_ids_in_context.add(str(speaker["user_id"])) if not user_ids_in_context: # 日志:记录未找到上下文用户 logger.warning(f"[{chat_stream.stream_id}] 未找到消息或最近发言者用于绰号注入。") @@ -198,12 +196,14 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis nickname_injection_str = format_nickname_prompt_injection(selected_nicknames) if nickname_injection_str: # 日志:记录生成的用于 Prompt 的绰号信息 - logger.debug(f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}") + logger.debug( + f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}" + ) except Exception as e: # 日志:记录获取或格式化绰号信息时发生的错误 logger.error(f"[{chat_stream.stream_id}] 获取或格式化 Prompt 绰号信息时出错: {e}", exc_info=True) - nickname_injection_str = "" # 出错时确保返回空字符串 + nickname_injection_str = "" # 出错时确保返回空字符串 # 返回最终生成的字符串(可能为空) return nickname_injection_str @@ -212,7 +212,7 @@ async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_lis async def trigger_nickname_analysis_if_needed( anchor_message: MessageRecv, bot_reply: List[str], - chat_stream: Optional[ChatStream] = None # 允许传入 chat_stream 或从 anchor_message 获取 + chat_stream: Optional[ChatStream] = None, # 允许传入 chat_stream 或从 anchor_message 获取 ): """ 如果满足条件(群聊、功能开启),则准备数据并触发绰号分析任务。 @@ -225,7 +225,7 @@ async def trigger_nickname_analysis_if_needed( """ # 检查功能是否开启 if not global_config.ENABLE_NICKNAME_MAPPING: - return # 如果功能禁用,直接返回 + return # 如果功能禁用,直接返回 # 确定使用的 chat_stream current_chat_stream = chat_stream or anchor_message.chat_stream @@ -233,45 +233,47 @@ async def trigger_nickname_analysis_if_needed( # 检查是否是群聊且 chat_stream 有效 if not current_chat_stream or not current_chat_stream.group_info: # 日志:记录跳过分析的原因(非群聊或无效流) - logger.debug(f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。") + logger.debug( + f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。" + ) return - log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀 + log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀 try: # 1. 获取历史记录 history_limit = 30 # 定义获取历史记录的数量限制 history_messages = get_raw_msg_before_timestamp_with_chat( chat_id=current_chat_stream.stream_id, - timestamp=time.time(), # 获取当前时间之前的记录 + timestamp=time.time(), # 获取当前时间之前的记录 limit=history_limit, ) # 格式化历史记录为可读字符串 chat_history_str = await build_readable_messages( messages=history_messages, - replace_bot_name=True, # 替换机器人名称,以便 LLM 分析 - merge_messages=False, # 不合并消息,保留原始对话结构 - timestamp_mode="relative", # 使用相对时间戳 - read_mark=0.0, # 不需要已读标记 - truncate=False, # 获取完整内容进行分析 + replace_bot_name=True, # 替换机器人名称,以便 LLM 分析 + merge_messages=False, # 不合并消息,保留原始对话结构 + timestamp_mode="relative", # 使用相对时间戳 + read_mark=0.0, # 不需要已读标记 + truncate=False, # 获取完整内容进行分析 ) # 2. 获取 Bot 回复字符串 - bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表 + bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表 # 3. 获取群号和平台信息 group_id = str(current_chat_stream.group_info.group_id) platform = current_chat_stream.platform # 4. 构建用户 ID 到名称的映射 (user_name_map) - user_ids_in_history = set() # 存储历史记录中出现的用户ID + user_ids_in_history = set() # 存储历史记录中出现的用户ID for msg in history_messages: sender_id = msg["user_info"].get("user_id") if sender_id: user_ids_in_history.add(str(sender_id)) - user_name_map = {} # 初始化映射字典 + user_name_map = {} # 初始化映射字典 if user_ids_in_history: try: # 批量从数据库获取这些用户的 person_name @@ -279,7 +281,7 @@ async def trigger_nickname_analysis_if_needed( except Exception as e: # 日志:记录获取 person_name 时发生的错误 logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True) - names_data = {} # 出错时使用空字典 + names_data = {} # 出错时使用空字典 # 填充 user_name_map for user_id in user_ids_in_history: @@ -290,12 +292,12 @@ async def trigger_nickname_analysis_if_needed( # 如果数据库中没有,则回退查找用户在历史记录中最近使用的 nickname latest_nickname = next( ( - m["user_info"].get("user_nickname") # 从 user_info 获取 nickname - for m in reversed(history_messages) # 从后往前找 + m["user_info"].get("user_nickname") # 从 user_info 获取 nickname + for m in reversed(history_messages) # 从后往前找 # 确保消息的用户ID匹配且 nickname 存在 if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname") ), - None, # 如果找不到,返回 None + None, # 如果找不到,返回 None ) # 如果找到了 nickname 则使用,否则使用 "未知(ID)" user_name_map[user_id] = latest_nickname or f"未知({user_id})" @@ -307,4 +309,4 @@ async def trigger_nickname_analysis_if_needed( except Exception as e: # 日志:记录触发分析过程中发生的任何其他错误 - logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) \ No newline at end of file + logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index bee95a56..777dc1dc 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -274,7 +274,9 @@ class PromptBuilder: ) return None - async def _build_prompt_normal(self, chat_stream, message_txt: str, sender_name: str = "某人") -> str: # 返回值改为 str + async def _build_prompt_normal( + self, chat_stream, message_txt: str, sender_name: str = "某人" + ) -> str: # 返回值改为 str individuality = Individuality.get_instance() prompt_personality = individuality.get_prompt(x_person=2, level=2) @@ -415,7 +417,7 @@ class PromptBuilder: memory_prompt=memory_prompt, prompt_info=prompt_info, schedule_prompt=schedule_prompt, - nickname_info=nickname_injection_str, # <--- 注入绰号信息 + nickname_info=nickname_injection_str, # <--- 注入绰号信息 chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") if chat_in_group else await global_prompt_manager.get_prompt_async("chat_target_private1"),