函数拆分,提高代码的可读性和可维护性

pull/914/head
Bakadax 2025-05-02 06:38:52 +08:00
parent 68cef9a725
commit 7e3b2d5a4b
1 changed files with 132 additions and 92 deletions

View File

@ -3,7 +3,7 @@ import traceback
import threading
import queue
from typing import Dict, Optional
from pymongo.collection import Collection
from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError
from src.common.logger_manager import get_logger
from src.common.database import db # 使用全局 db
@ -14,27 +14,144 @@ logger = get_logger("nickname_processor")
_stop_event = threading.Event()
def _upsert_person(collection: Collection, person_id: str, user_id_int: int, platform: str):
"""
确保数据库中存在指定 person_id 的文档 (Upsert)
如果文档不存在则使用提供的用户信息创建它
Args:
collection: MongoDB 集合对象 (person_info)
person_id: 要查找或创建的 person_id
user_id_int: 用户的整数 ID
platform: 平台名称
Returns:
UpdateResult: MongoDB 更新操作的结果
Raises:
DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)
Exception: 其他数据库操作错误
"""
try:
# 关键步骤:基于 person_id 执行 Upsert
# 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。
# 如果文档已存在,此操作不会修改任何内容。
result = collection.update_one(
{"person_id": person_id},
{
"$setOnInsert": {
"person_id": person_id,
"user_id": user_id_int, # 确保这里使用传入的 user_id_int
"platform": platform,
"group_nicknames": [], # 初始化 group_nicknames 数组
}
},
upsert=True,
)
if result.upserted_id:
logger.debug(f"Upsert on person_id created new document: {person_id}")
# else:
# logger.debug(f"Upsert on person_id found existing document: {person_id}")
return result
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由 upsert 触发。
# 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。
logger.error(
f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。"
)
raise # 将异常向上抛出,让调用者处理
except Exception as e:
logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}")
raise # 将异常向上抛出
def _update_group_nickname(collection: Collection, person_id: str, group_id_str: str, nickname: str):
"""
尝试更新 person_id 文档中特定群组的绰号计数或添加新条目
按顺序尝试增加计数 -> 添加绰号 -> 添加群组
Args:
collection: MongoDB 集合对象 (person_info)
person_id: 目标文档的 person_id
group_id_str: 目标群组的 ID (字符串)
nickname: 要更新或添加的绰号
"""
# 3a. 尝试增加现有群组中现有绰号的计数
result_inc = collection.update_one(
{
"person_id": person_id,
"group_nicknames": {
"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}
},
},
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}},
array_filters=[
{"group.group_id": group_id_str},
{"nick.name": nickname},
],
)
if result_inc.modified_count > 0:
# logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。")
return # 成功增加计数,操作完成
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
result_push_nick = collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": group_id_str, # 检查群组是否存在
},
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
array_filters=[{"group.group_id": group_id_str}],
)
if result_push_nick.modified_count > 0:
logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'")
return # 成功添加绰号,操作完成
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
# 确保 group_nicknames 数组存在 (作为保险措施)
collection.update_one(
{"person_id": person_id, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}},
)
# 推送新的群组对象到 group_nicknames 数组
result_push_group = collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在
},
{
"$push": {
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}],
}
}
},
)
if result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
# else:
# 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能)
# 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。
# logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。")
async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数 (使用全局 db)
通过首先基于 person_id 进行 upsert 来处理潜在的 race condition
通过调用辅助函数来处理 person 文档的 upsert 和绰号更新
Args:
platform (str): 平台名称 (e.g., 'qq')
group_id (str): 群组 ID
nickname_map (Dict[str, str]): 用户 ID (字符串) 到绰号的映射
"""
# 尝试导入 person_info_manager (放在函数内部以减少潜在的导入问题)
try:
# 假设 person_info 在 group_nickname 的上一级目录
from ..person_info.person_info import person_info_manager
except ImportError:
logger.error("无法导入 person_info_manager无法生成 person_id")
return # 无法继续,因为需要 person_id
return
person_info_collection = db.person_info
if not nickname_map:
logger.debug("提供的用于更新的绰号映射为空。")
return
@ -61,97 +178,20 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic
logger.error(f"无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。")
continue
# --- 步骤 2: 基于 person_id 执行 Upsert ---
# 这是关键步骤,用于原子性地确保文档存在,避免 person_id 冲突。
# 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。
# 如果文档已存在,此操作不会修改任何内容(因为没有 $set 操作符)。
upsert_result = person_info_collection.update_one(
{"person_id": person_id}, # Filter by the unique key
{
"$setOnInsert": {
"person_id": person_id,
"user_id": user_id_int,
"platform": platform,
"group_nicknames": [], # 初始化 group_nicknames 数组
}
},
upsert=True,
)
# --- 步骤 2: 确保 Person 文档存在 (调用辅助函数) ---
_upsert_person(person_info_collection, person_id, user_id_int, platform)
# 可选日志:记录是否创建了新文档
if upsert_result.upserted_id:
logger.debug(f"Upsert on person_id created new document: {person_id}")
# else:
# logger.debug(f"Upsert on person_id found existing document: {person_id}")
# --- 步骤 3: 更新群组绰号 (调用辅助函数) ---
_update_group_nickname(person_info_collection, person_id, group_id_str, nickname)
# --- 步骤 3: 更新群组绰号 ---
# 现在我们确信具有此 person_id 的文档存在,可以安全地更新其 group_nicknames。
# 3a. 尝试增加现有群组中现有绰号的计数
update_result_inc = person_info_collection.update_one(
{
"person_id": person_id, # 明确目标文档
"group_nicknames": { # 检查数组中是否有匹配项
"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}
},
},
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, # 增加计数
array_filters=[ # 指定要更新的数组元素
{"group.group_id": group_id_str},
{"nick.name": nickname},
],
)
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
if update_result_inc.modified_count == 0:
update_result_push_nick = person_info_collection.update_one(
{
"person_id": person_id, # 明确目标文档
"group_nicknames.group_id": group_id_str, # 检查群组是否存在
},
# 将新绰号添加到匹配群组的 nicknames 数组中
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
array_filters=[{"group.group_id": group_id_str}], # 指定要推送到的群组
)
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
if update_result_push_nick.modified_count == 0:
# 确保 group_nicknames 数组存在 (如果 $setOnInsert 失败或数据不一致时的保险措施)
person_info_collection.update_one(
{"person_id": person_id, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}},
)
# 推送新的群组对象到 group_nicknames 数组
update_result_push_group = person_info_collection.update_one(
{
"person_id": person_id, # 明确目标文档
"group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在
},
{
"$push": { # 添加新的群组条目
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}], # 初始化绰号列表
}
}
},
)
if update_result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由步骤 2 的 upsert 触发。
# 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。
logger.error(
f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。"
)
except OperationFailure as op_err:
# --- 统一处理数据库操作可能抛出的异常 ---
except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误
logger.exception(
f"数据库操作失败 (OperationFailure): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}({op_err})"
f"数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
except Exception as e:
logger.exception(f"更新用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}")
# 捕获其他所有可能的错误 (例如 person_id 生成、辅助函数内部未捕获的错误等)
logger.exception(f"处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}")
# --- 使用 queue.Queue ---
queue_max_size = getattr(global_config, "NICKNAME_QUEUE_MAX_SIZE", 100)