pull/914/head
Bakadax 2025-05-01 23:03:43 +08:00
parent 8bf7095800
commit 0ca3cada9a
3 changed files with 10 additions and 30 deletions

2
bot.py
View File

@ -12,8 +12,6 @@ from src.common.logger_manager import get_logger
# from src.common.logger import LogConfig, CONFIRM_STYLE_CONFIG
from src.common.crash_logger import install_crash_handler
from src.main import MainSystem
from src.main import MainSystem
from src.plugins.group_nickname.nickname_processor import start_nickname_processor, stop_nickname_processor # <--- 添加这行导入
import atexit

View File

@ -1,7 +1,7 @@
import os
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from typing import Dict, List, Optional, Any
from dateutil import tz
import tomli

View File

@ -4,37 +4,18 @@ import asyncio
import traceback
import threading
import queue
from typing import Dict, Optional, Any
from typing import Dict, Optional
# 数据库和日志导入
from pymongo import MongoClient
from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError
from src.common.logger_manager import get_logger
from src.common.database import db # 使用全局 db
from .nickname_mapper import analyze_chat_for_nicknames
from src.config.config import global_config
logger = get_logger("nickname_processor")
# --- 恢复导入全局 config ---
try:
from src.config.config import global_config # <--- 直接导入全局配置
except ImportError:
logger.critical("无法导入 global_config")
# 提供一个默认的回退配置对象,如果 global_config 导入失败
class FallbackConfig:
ENABLE_NICKNAME_MAPPING = False
NICKNAME_QUEUE_MAX_SIZE = 100
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5
global_config = FallbackConfig()
# ---------------------------
# 绰号分析函数导入
from .nickname_mapper import analyze_chat_for_nicknames
# --- 使用 threading.Event ---
_stop_event = threading.Event()
# --------------------------
# --- 数据库更新逻辑 (使用全局 db) - 修复 Race Condition 版 ---
async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数 (使用全局 db)
@ -100,7 +81,7 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic
# 可选日志:记录是否创建了新文档
if upsert_result.upserted_id:
logger.debug(f"Upsert on person_id created new document: {person_id}")
logger.debug(f"Upsert on person_id created new document: {person_id}")
# else:
# logger.debug(f"Upsert on person_id found existing document: {person_id}")
@ -157,16 +138,16 @@ async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dic
}
)
if update_result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由步骤 2 的 upsert 触发。
# 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。
logger.error(f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。")
except OperationFailure as op_err:
logger.exception(f"数据库操作失败 (OperationFailure): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}")
logger.exception(f"数据库操作失败 (OperationFailure): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}({op_err})")
except Exception as e:
logger.exception(f"更新用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误")
logger.exception(f"更新用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误{e}")
# --- 使用 queue.Queue ---
@ -254,7 +235,8 @@ def _run_processor_thread(q: queue.Queue, stop_event: threading.Event):
all_tasks = asyncio.all_tasks(loop)
if all_tasks:
logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} running tasks...")
for task in all_tasks: task.cancel()
for task in all_tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
logger.info(f"(Thread ID: {thread_id}) All tasks cancelled.")
loop.stop()