From 7e3b2d5a4b870452161932b1ea6abc4d94a4a91d Mon Sep 17 00:00:00 2001 From: Bakadax Date: Fri, 2 May 2025 06:38:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=BD=E6=95=B0=E6=8B=86=E5=88=86=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E4=BB=A3=E7=A0=81=E7=9A=84=E5=8F=AF=E8=AF=BB?= =?UTF-8?q?=E6=80=A7=E5=92=8C=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../group_nickname/nickname_processor.py | 224 +++++++++++------- 1 file changed, 132 insertions(+), 92 deletions(-) diff --git a/src/plugins/group_nickname/nickname_processor.py b/src/plugins/group_nickname/nickname_processor.py index ceb282da..f36244a4 100644 --- a/src/plugins/group_nickname/nickname_processor.py +++ b/src/plugins/group_nickname/nickname_processor.py @@ -3,7 +3,7 @@ import traceback import threading import queue from typing import Dict, Optional - +from pymongo.collection import Collection from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError from src.common.logger_manager import get_logger from src.common.database import db # 使用全局 db @@ -14,27 +14,144 @@ logger = get_logger("nickname_processor") _stop_event = threading.Event() +def _upsert_person(collection: Collection, person_id: str, user_id_int: int, platform: str): + """ + 确保数据库中存在指定 person_id 的文档 (Upsert)。 + 如果文档不存在,则使用提供的用户信息创建它。 + + Args: + collection: MongoDB 集合对象 (person_info)。 + person_id: 要查找或创建的 person_id。 + user_id_int: 用户的整数 ID。 + platform: 平台名称。 + + Returns: + UpdateResult: MongoDB 更新操作的结果。 + + Raises: + DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)。 + Exception: 其他数据库操作错误。 + """ + try: + # 关键步骤:基于 person_id 执行 Upsert + # 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。 + # 如果文档已存在,此操作不会修改任何内容。 + result = collection.update_one( + {"person_id": person_id}, + { + "$setOnInsert": { + "person_id": person_id, + "user_id": user_id_int, # 确保这里使用传入的 user_id_int + "platform": platform, + "group_nicknames": [], # 初始化 group_nicknames 数组 + } + }, + upsert=True, + ) + if result.upserted_id: + logger.debug(f"Upsert on person_id created new document: {person_id}") + # else: + # logger.debug(f"Upsert on person_id found existing document: {person_id}") + return result + except DuplicateKeyError as dk_err: + # 这个错误理论上不应该再由 upsert 触发。 + # 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。 + logger.error( + f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" + ) + raise # 将异常向上抛出,让调用者处理 + except Exception as e: + logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}") + raise # 将异常向上抛出 + + +def _update_group_nickname(collection: Collection, person_id: str, group_id_str: str, nickname: str): + """ + 尝试更新 person_id 文档中特定群组的绰号计数,或添加新条目。 + 按顺序尝试:增加计数 -> 添加绰号 -> 添加群组。 + + Args: + collection: MongoDB 集合对象 (person_info)。 + person_id: 目标文档的 person_id。 + group_id_str: 目标群组的 ID (字符串)。 + nickname: 要更新或添加的绰号。 + """ + # 3a. 尝试增加现有群组中现有绰号的计数 + result_inc = collection.update_one( + { + "person_id": person_id, + "group_nicknames": { + "$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname} + }, + }, + {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, + array_filters=[ + {"group.group_id": group_id_str}, + {"nick.name": nickname}, + ], + ) + if result_inc.modified_count > 0: + # logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。") + return # 成功增加计数,操作完成 + + # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 + result_push_nick = collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": group_id_str, # 检查群组是否存在 + }, + {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, + array_filters=[{"group.group_id": group_id_str}], + ) + if result_push_nick.modified_count > 0: + logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'。") + return # 成功添加绰号,操作完成 + + # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 + # 确保 group_nicknames 数组存在 (作为保险措施) + collection.update_one( + {"person_id": person_id, "group_nicknames": {"$exists": False}}, + {"$set": {"group_nicknames": []}}, + ) + # 推送新的群组对象到 group_nicknames 数组 + result_push_group = collection.update_one( + { + "person_id": person_id, + "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 + }, + { + "$push": { + "group_nicknames": { + "group_id": group_id_str, + "nicknames": [{"name": nickname, "count": 1}], + } + } + }, + ) + if result_push_group.modified_count > 0: + logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'。") + # else: + # 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能), + # 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。 + # logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。") async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]): """ 更新数据库中用户的群组绰号计数 (使用全局 db)。 - 通过首先基于 person_id 进行 upsert 来处理潜在的 race condition。 + 通过调用辅助函数来处理 person 文档的 upsert 和绰号更新。 Args: platform (str): 平台名称 (e.g., 'qq')。 group_id (str): 群组 ID。 nickname_map (Dict[str, str]): 用户 ID (字符串) 到绰号的映射。 """ - # 尝试导入 person_info_manager (放在函数内部以减少潜在的导入问题) try: - # 假设 person_info 在 group_nickname 的上一级目录 from ..person_info.person_info import person_info_manager except ImportError: logger.error("无法导入 person_info_manager,无法生成 person_id!") - return # 无法继续,因为需要 person_id + return person_info_collection = db.person_info - if not nickname_map: logger.debug("提供的用于更新的绰号映射为空。") return @@ -61,97 +178,20 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic logger.error(f"无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。") continue - # --- 步骤 2: 基于 person_id 执行 Upsert --- - # 这是关键步骤,用于原子性地确保文档存在,避免 person_id 冲突。 - # 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。 - # 如果文档已存在,此操作不会修改任何内容(因为没有 $set 操作符)。 - upsert_result = person_info_collection.update_one( - {"person_id": person_id}, # Filter by the unique key - { - "$setOnInsert": { - "person_id": person_id, - "user_id": user_id_int, - "platform": platform, - "group_nicknames": [], # 初始化 group_nicknames 数组 - } - }, - upsert=True, - ) + # --- 步骤 2: 确保 Person 文档存在 (调用辅助函数) --- + _upsert_person(person_info_collection, person_id, user_id_int, platform) - # 可选日志:记录是否创建了新文档 - if upsert_result.upserted_id: - logger.debug(f"Upsert on person_id created new document: {person_id}") - # else: - # logger.debug(f"Upsert on person_id found existing document: {person_id}") + # --- 步骤 3: 更新群组绰号 (调用辅助函数) --- + _update_group_nickname(person_info_collection, person_id, group_id_str, nickname) - # --- 步骤 3: 更新群组绰号 --- - # 现在我们确信具有此 person_id 的文档存在,可以安全地更新其 group_nicknames。 - - # 3a. 尝试增加现有群组中现有绰号的计数 - update_result_inc = person_info_collection.update_one( - { - "person_id": person_id, # 明确目标文档 - "group_nicknames": { # 检查数组中是否有匹配项 - "$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname} - }, - }, - {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, # 增加计数 - array_filters=[ # 指定要更新的数组元素 - {"group.group_id": group_id_str}, - {"nick.name": nickname}, - ], - ) - - # 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组 - if update_result_inc.modified_count == 0: - update_result_push_nick = person_info_collection.update_one( - { - "person_id": person_id, # 明确目标文档 - "group_nicknames.group_id": group_id_str, # 检查群组是否存在 - }, - # 将新绰号添加到匹配群组的 nicknames 数组中 - {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, - array_filters=[{"group.group_id": group_id_str}], # 指定要推送到的群组 - ) - - # 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号 - if update_result_push_nick.modified_count == 0: - # 确保 group_nicknames 数组存在 (如果 $setOnInsert 失败或数据不一致时的保险措施) - person_info_collection.update_one( - {"person_id": person_id, "group_nicknames": {"$exists": False}}, - {"$set": {"group_nicknames": []}}, - ) - # 推送新的群组对象到 group_nicknames 数组 - update_result_push_group = person_info_collection.update_one( - { - "person_id": person_id, # 明确目标文档 - "group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在 - }, - { - "$push": { # 添加新的群组条目 - "group_nicknames": { - "group_id": group_id_str, - "nicknames": [{"name": nickname, "count": 1}], # 初始化绰号列表 - } - } - }, - ) - if update_result_push_group.modified_count > 0: - logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'") - - except DuplicateKeyError as dk_err: - # 这个错误理论上不应该再由步骤 2 的 upsert 触发。 - # 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。 - logger.error( - f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。" - ) - except OperationFailure as op_err: + # --- 统一处理数据库操作可能抛出的异常 --- + except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误 logger.exception( - f"数据库操作失败 (OperationFailure): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}({op_err})" + f"数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}" ) except Exception as e: - logger.exception(f"更新用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") - + # 捕获其他所有可能的错误 (例如 person_id 生成、辅助函数内部未捕获的错误等) + logger.exception(f"处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}") # --- 使用 queue.Queue --- queue_max_size = getattr(global_config, "NICKNAME_QUEUE_MAX_SIZE", 100)