diff --git a/src/plugins/group_nickname/nickname_processor.py b/src/plugins/group_nickname/nickname_processor.py index f36244a4..f765cb9b 100644 --- a/src/plugins/group_nickname/nickname_processor.py +++ b/src/plugins/group_nickname/nickname_processor.py @@ -14,6 +14,7 @@ logger = get_logger("nickname_processor") _stop_event = threading.Event() + def _upsert_person(collection: Collection, person_id: str, user_id_int: int, platform: str): """ 确保数据库中存在指定 person_id 的文档 (Upsert)。 @@ -41,7 +42,7 @@ def _upsert_person(collection: Collection, person_id: str, user_id_int: int, pla { "$setOnInsert": { "person_id": person_id, - "user_id": user_id_int, # 确保这里使用传入的 user_id_int + "user_id": user_id_int, # 确保这里使用传入的 user_id_int "platform": platform, "group_nicknames": [], # 初始化 group_nicknames 数组 } @@ -59,10 +60,10 @@ def _upsert_person(collection: Collection, person_id: str, user_id_int: int, pla logger.error( f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" ) - raise # 将异常向上抛出,让调用者处理 + raise # 将异常向上抛出,让调用者处理 except Exception as e: logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}") - raise # 将异常向上抛出 + raise # 将异常向上抛出 def _update_group_nickname(collection: Collection, person_id: str, group_id_str: str, nickname: str): @@ -80,9 +81,7 @@ def _update_group_nickname(collection: Collection, person_id: str, group_id_str: result_inc = collection.update_one( { "person_id": person_id, - "group_nicknames": { - "$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname} - }, + "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}, }, {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, array_filters=[ @@ -92,20 +91,20 @@ def _update_group_nickname(collection: Collection, person_id: str, group_id_str: ) if result_inc.modified_count > 0: # logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。") - return # 成功增加计数,操作完成 + return # 成功增加计数,操作完成 # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 result_push_nick = collection.update_one( { "person_id": person_id, - "group_nicknames.group_id": group_id_str, # 检查群组是否存在 + "group_nicknames.group_id": group_id_str, # 检查群组是否存在 }, {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, array_filters=[{"group.group_id": group_id_str}], ) if result_push_nick.modified_count > 0: logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'。") - return # 成功添加绰号,操作完成 + return # 成功添加绰号,操作完成 # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 # 确保 group_nicknames 数组存在 (作为保险措施) @@ -117,7 +116,7 @@ def _update_group_nickname(collection: Collection, person_id: str, group_id_str: result_push_group = collection.update_one( { "person_id": person_id, - "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 + "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 }, { "$push": { @@ -131,9 +130,10 @@ def _update_group_nickname(collection: Collection, person_id: str, group_id_str: if result_push_group.modified_count > 0: logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'。") # else: - # 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能), - # 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。 - # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") + # 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能), + # 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。 + # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") + async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]): """ @@ -185,7 +185,7 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic _update_group_nickname(person_info_collection, person_id, group_id_str, nickname) # --- 统一处理数据库操作可能抛出的异常 --- - except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误 + except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误 logger.exception( f"数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}" ) @@ -193,6 +193,7 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic # 捕获其他所有可能的错误 (例如 person_id 生成、辅助函数内部未捕获的错误等) logger.exception(f"处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") + # --- 使用 queue.Queue --- queue_max_size = getattr(global_config, "NICKNAME_QUEUE_MAX_SIZE", 100) nickname_queue: queue.Queue = queue.Queue(maxsize=queue_max_size) diff --git a/src/plugins/group_nickname/nickname_utils.py b/src/plugins/group_nickname/nickname_utils.py index c2168016..9c05bd3a 100644 --- a/src/plugins/group_nickname/nickname_utils.py +++ b/src/plugins/group_nickname/nickname_utils.py @@ -55,11 +55,15 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int # 如果抽样结果数量不足(例如权重问题导致提前退出),可以考虑是否需要补充 if len(selected_candidates_with_weight) < num_to_select: - logger.debug(f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),补充选择次数最多的。") + logger.debug( + f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),补充选择次数最多的。" + ) # 筛选出未被选中的候选 - selected_ids = set((c[0], c[1]) for c in selected_candidates_with_weight) # 使用 (用户名, 绰号) 作为唯一标识 + selected_ids = set( + (c[0], c[1]) for c in selected_candidates_with_weight + ) # 使用 (用户名, 绰号) 作为唯一标识 remaining_candidates = [c for c in candidates if (c[0], c[1]) not in selected_ids] - remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 needed = num_to_select - len(selected_candidates_with_weight) selected_candidates_with_weight.extend(remaining_candidates[:needed]) @@ -67,15 +71,14 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int # 日志:记录加权随机选择时发生的错误,并回退到简单选择 logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True) # 出错时回退到选择次数最多的 N 个 - candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 + candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序 # 注意:这里需要选择包含权重的元组,或者调整后续处理 selected_candidates_with_weight = candidates[:num_to_select] - # 格式化输出结果为 (用户名, 绰号, 次数),移除权重 result = [(user, nick, count) for user, nick, count, _weight in selected_candidates_with_weight] - result.sort(key=lambda x: x[2], reverse=True) # 按次数降序 + result.sort(key=lambda x: x[2], reverse=True) # 按次数降序 logger.debug(f"为 Prompt 选择的绰号: {result}") return result @@ -285,7 +288,10 @@ async def trigger_nickname_analysis_if_needed( # 日志:记录触发分析过程中发生的任何其他错误 logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True) -def weighted_sample_without_replacement(candidates: List[Tuple[str, str, int, float]], k: int) -> List[Tuple[str, str, int, float]]: + +def weighted_sample_without_replacement( + candidates: List[Tuple[str, str, int, float]], k: int +) -> List[Tuple[str, str, int, float]]: """ 执行不重复的加权随机抽样。 @@ -300,18 +306,18 @@ def weighted_sample_without_replacement(candidates: List[Tuple[str, str, int, fl return [] if k >= len(candidates): # 如果需要选择的数量大于或等于候选数量,直接返回所有候选 - return candidates[:] # 返回副本以避免修改原始列表 + return candidates[:] # 返回副本以避免修改原始列表 - pool = candidates[:] # 创建候选列表的副本进行操作 + pool = candidates[:] # 创建候选列表的副本进行操作 selected = [] # 注意:原评论代码中计算 total_weight 但未使用,这里也省略。 # random.choices 内部会处理权重的归一化。 - for _ in range(min(k, len(pool))): # 确保迭代次数不超过池中剩余元素 - if not pool: # 如果池已空,提前结束 + for _ in range(min(k, len(pool))): # 确保迭代次数不超过池中剩余元素 + if not pool: # 如果池已空,提前结束 break - weights = [c[3] for c in pool] # 获取当前池中所有元素的权重 + weights = [c[3] for c in pool] # 获取当前池中所有元素的权重 # 检查权重是否有效 if sum(weights) <= 0: # 如果所有剩余权重无效,随机选择一个(或根据需要采取其他策略) @@ -322,8 +328,8 @@ def weighted_sample_without_replacement(candidates: List[Tuple[str, str, int, fl # 使用 random.choices 进行加权抽样,选择 1 个 # random.choices 返回一个列表,所以取第一个元素 [0] chosen = random.choices(pool, weights=weights, k=1)[0] - pool.remove(chosen) # 从池中移除选中的元素,实现不重复抽样 + pool.remove(chosen) # 从池中移除选中的元素,实现不重复抽样 selected.append(chosen) - return selected \ No newline at end of file + return selected