pull/914/head
Bakadax 2025-05-01 19:09:47 +08:00
parent 1e0348ebf3
commit f042f7b689
2 changed files with 146 additions and 222 deletions

18
bot.py
View File

@ -221,15 +221,6 @@ def raw_main():
env_config = {key: os.getenv(key) for key in os.environ}
scan_provider(env_config)
# 在这里启动绰号处理进程
logger.info("准备启动绰号处理进程...")
start_nickname_processor() # <--- 添加启动调用
logger.info("已调用启动绰号处理进程。")
# 注册退出处理函数 (确保进程能被关闭)
atexit.register(stop_nickname_processor) # <--- 在这里注册停止函数
logger.info("已注册绰号处理进程的退出处理程序。")
# 返回MainSystem实例
return MainSystem()
@ -239,6 +230,15 @@ if __name__ == "__main__":
# 获取MainSystem实例
main_system = raw_main()
# 在这里启动绰号处理进程
logger.info("准备启动绰号处理线程...")
start_nickname_processor() # <--- 添加启动调用
logger.info("已调用启动绰号处理线程。")
# 注册退出处理函数 (确保进程能被关闭)
atexit.register(stop_nickname_processor) # <--- 在这里注册停止函数
logger.info("已注册绰号处理线程的退出处理程序。")
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

View File

@ -1,155 +1,68 @@
# nickname_processor.py (多线程版本 - 使用全局 config)
import asyncio
import traceback
from multiprocessing import Process, Queue as mpQueue, Event
from typing import Dict, Optional
import threading
import queue
from typing import Dict, Optional, Any
# 数据库和日志导入
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from src.common.logger_manager import get_logger
from src.common.database import db # 使用全局 db
from src.common.logger_manager import get_logger # 导入日志管理器
from src.config.config import global_config # 导入全局配置
from .nickname_mapper import analyze_chat_for_nicknames # 导入绰号分析函数
from src.common.database import db # 导入数据库初始化和关闭函数
logger = get_logger("nickname_processor")
logger = get_logger("nickname_processor") # 获取日志记录器实例
# --- 运行时状态 (用于安全停止进程) ---
_stop_event = Event()
# --- 恢复导入全局 config ---
try:
from src.config.config import global_config # <--- 直接导入全局配置
except ImportError:
logger.critical("无法导入 global_config")
global_config = None # 设置为 None
# ---------------------------
# --- 数据库连接 ---
mongo_client: Optional[MongoClient] = None # MongoDB 客户端实例
person_info_collection = None # 用户信息集合对象
# 绰号分析函数导入
from .nickname_mapper import analyze_chat_for_nicknames
# --- 数据库更新逻辑 (使用推荐的新结构) ---
# --- 使用 threading.Event ---
_stop_event = threading.Event()
# --------------------------
# --- 数据库更新逻辑 (使用全局 db) ---
async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数
使用新的数据结构:
{
"user_id": 12345,
"group_nicknames": [ # <--- 字段名统一为 group_nicknames
{
"group_id": "群号1",
"nicknames": [ { "name": "绰号A", "count": 5 }, ... ]
}, ...
]
}
"""
person_info_collection = db.person_info # 获取集合对象
if not nickname_map:
logger.debug("提供的用于更新的绰号映射为空。")
return
logger.info(f"尝试更新群组 '{group_id}' 的绰号计数 (新结构),映射为: {nickname_map}")
for user_id_str, nickname in nickname_map.items(): # user_id 从 map 中取出是 str
if not user_id_str or not nickname:
logger.warning(f"跳过绰号映射中的无效条目: user_id='{user_id_str}', nickname='{nickname}'")
continue
group_id_str = str(group_id) # 确保 group_id 是字符串
"""更新数据库中用户的群组绰号计数 (使用全局 db)"""
person_info_collection = db.person_info
# ... (函数体保持不变, 参考之前的版本) ...
if not nickname_map: logger.debug("提供的用于更新的绰号映射为空。"); return
logger.info(f"尝试更新群组 '{group_id}' 的绰号计数,映射为: {nickname_map}")
for user_id_str, nickname in nickname_map.items():
if not user_id_str or not nickname: logger.warning(f"跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'"); continue
group_id_str = str(group_id)
try: user_id_int = int(user_id_str)
except ValueError: logger.warning(f"无效的用户ID格式: '{user_id_str}',跳过。"); continue
try:
# 假设数据库中存储的用户ID是整数类型如果不是请移除 int()
user_id_int = int(user_id_str)
except ValueError:
logger.warning(f"无效的用户ID格式: '{user_id_str}',跳过。")
continue
try:
# 步骤 1: 确保用户文档存在,且有 group_nicknames 字段 (如果不存在则添加空数组)
# 注意:这里不再使用 $setOnInsert 添加 group_nicknames因为 $addToSet 或 $push 在字段不存在时会自动创建。
# upsert=True 确保用户文档存在。
person_info_collection.update_one(
{"user_id": user_id_int},
{"$setOnInsert": {"user_id": user_id_int}}, # 确保 upsert 时 user_id 被正确设置
upsert=True
)
# 确保 group_nicknames 字段存在且为数组 (如果不存在则创建)
person_info_collection.update_one(
{"user_id": user_id_int, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}}
)
person_info_collection.update_one({"user_id": user_id_int},{"$setOnInsert": {"user_id": user_id_int}}, upsert=True)
person_info_collection.update_one({"user_id": user_id_int, "group_nicknames": {"$exists": False}}, {"$set": {"group_nicknames": []}})
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}}, {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, array_filters=[{"group.group_id": group_id_str}, {"nick.name": nickname}])
if update_result.modified_count > 0: continue
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames.group_id": group_id_str}, {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, array_filters=[{"group.group_id": group_id_str}])
if update_result.modified_count > 0: continue
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames.group_id": {"$ne": group_id_str}}, {"$push": {"group_nicknames": {"group_id": group_id_str, "nicknames": [{"name": nickname, "count": 1}]}}})
except OperationFailure as op_err: logger.exception(f"数据库操作失败: 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}")
except Exception as e: logger.exception(f"更新用户 {user_id_str} 的绰号 {nickname} 时发生意外错误")
# 步骤 2: 尝试直接增加现有绰号的计数
# 条件:用户存在,且 group_nicknames 数组中存在一个元素其 group_id 匹配,且该元素的 nicknames 数组中存在一个元素的 name 匹配
update_result = person_info_collection.update_one(
{
"user_id": user_id_int,
"group_nicknames": { # <--- 确保使用 group_nicknames
"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}
}
},
{ # <--- 确保使用 group_nicknames
"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}
},
array_filters=[
{"group.group_id": group_id_str},
{"nick.name": nickname}
]
)
# --- 使用 queue.Queue ---
# --- 修改:直接使用 global_config ---
queue_max_size = getattr(global_config, 'NICKNAME_QUEUE_MAX_SIZE', 100) if global_config else 100
# --------------------------------
nickname_queue: queue.Queue = queue.Queue(maxsize=queue_max_size)
# ----------------------
if update_result.modified_count > 0:
logger.debug(f"用户 '{user_id_str}' 在群组 '{group_id_str}' 中的绰号 '{nickname}' 计数已增加。")
continue # 处理完成,进行下一次循环
# 步骤 3: 如果步骤 2 未修改任何内容,尝试将新绰号添加到现有群组的 nicknames 数组中
# 条件:用户存在,且 group_nicknames 数组中存在一个元素其 group_id 匹配
update_result = person_info_collection.update_one(
{
"user_id": user_id_int,
"group_nicknames.group_id": group_id_str # <--- 确保使用 group_nicknames
},
{ # <--- 确保使用 group_nicknames
"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}
},
array_filters=[
{"group.group_id": group_id_str}
]
)
if update_result.modified_count > 0:
logger.debug(f"为用户 '{user_id_str}' 在群组 '{group_id_str}' 中添加了新绰号 '{nickname}',计数为 1。")
continue # 处理完成,进行下一次循环
# 步骤 4: 如果步骤 2 和 3 都未修改任何内容,说明群组条目本身可能不存在于 group_nicknames 数组中,尝试添加新的群组条目
# 条件:用户存在,且 group_nicknames 数组中 *不包含* 指定 group_id 的元素
update_result = person_info_collection.update_one(
{
"user_id": user_id_int,
"group_nicknames.group_id": {"$ne": group_id_str} # <--- 检查 group_id 是否不存在
},
{
"$push": { # <--- 确保使用 group_nicknames
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}]
}
}
}
# 注意:这里不需要 upsert=True因为步骤1已确保用户存在。
# 如果字段 group_nicknames 不存在,$push 会自动创建它。
)
# 记录日志(无论修改与否,因为可能是因为组已存在但无匹配导致没修改)
if update_result.modified_count > 0:
logger.debug(f"为用户 '{user_id_str}' 添加了新群组 '{group_id_str}' 条目和绰号 '{nickname}'")
else:
# 到这里还没成功,可能意味着群组已存在但之前的步骤意外失败,或者有并发问题
logger.warning(f"未能为用户 '{user_id_str}' 更新或添加群组 '{group_id_str}' 的绰号 '{nickname}'。可能群组已存在但前面的步骤未成功修改。UpdateResult: {update_result.raw_result}")
except OperationFailure as op_err:
# 使用 logger.exception 来记录数据库操作错误,自动包含 traceback
logger.exception(f"数据库操作失败: 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}") # <--- 修改了日志记录方式
except Exception as e:
# 记录其他意外错误
logger.exception(f"更新用户 {user_id_str} 的绰号 {nickname} 时发生意外错误") # <--- 修改了日志记录方式
# --- 队列和进程 ---
nickname_queue: mpQueue = mpQueue(maxsize=global_config.NICKNAME_QUEUE_MAX_SIZE)
_nickname_process: Optional[Process] = None
_nickname_thread: Optional[threading.Thread] = None
# --- add_to_nickname_queue (使用全局 config) ---
async def add_to_nickname_queue(
chat_history_str: str,
bot_reply: str,
@ -157,111 +70,122 @@ async def add_to_nickname_queue(
user_name_map: Dict[str, str]
):
"""将需要分析的数据放入队列。"""
if not global_config.ENABLE_NICKNAME_MAPPING:
# --- 修改:使用全局 config ---
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
# ---------------------------
return
if group_id is None:
logger.debug("私聊跳过绰号映射。")
return
if group_id is None: logger.debug("私聊跳过绰号映射。"); return
try:
item = (chat_history_str, bot_reply, str(group_id), user_name_map)
nickname_queue.put_nowait(item)
logger.debug(f"已将项目添加到群组 {group_id} 的绰号队列。")
except Exception as e:
logger.warning(f"无法将项目添加到绰号队列(可能已满): {e}", exc_info=True)
logger.debug(f"已将项目添加到群组 {group_id} 的绰号队列。当前大小: {nickname_queue.qsize()}")
except queue.Full: logger.warning(f"无法将项目添加到绰号队列:队列已满 (maxsize={nickname_queue.maxsize})。")
except Exception as e: logger.warning(f"无法将项目添加到绰号队列: {e}", exc_info=True)
async def _nickname_processing_loop(queue: mpQueue, stop_event):
"""独立进程中的主循环,处理队列任务。"""
logger.info("绰号处理循环已启动。")
# --- _nickname_processing_loop (使用全局 config) ---
async def _nickname_processing_loop(q: queue.Queue, stop_event: threading.Event):
"""独立线程中的主循环,处理队列任务 (使用全局 db 和 config)。"""
thread_id = threading.get_ident()
logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。")
# --- 修改:使用全局 config ---
sleep_interval = getattr(global_config, 'NICKNAME_PROCESS_SLEEP_INTERVAL', 0.5) if global_config else 0.5
# ---------------------------
while not stop_event.is_set():
try:
if not queue.empty():
item = queue.get()
if isinstance(item, tuple) and len(item) == 4:
chat_history_str, bot_reply, group_id, user_name_map = item
logger.debug(f"正在处理群组 {group_id} 的绰号映射任务...")
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(group_id, analysis_result["data"])
else:
logger.warning(f"从队列接收到意外的项目类型: {type(item)}")
await asyncio.sleep(5)
item = q.get(block=True, timeout=sleep_interval)
if isinstance(item, tuple) and len(item) == 4:
chat_history_str, bot_reply, group_id, user_name_map = item
logger.debug(f"(线程 ID: {thread_id}) 正在处理群组 {group_id} 的绰号映射任务...")
# analyze_chat_for_nicknames 内部也应使用 global_config
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(group_id, analysis_result["data"])
else:
await asyncio.sleep(global_config.NICKNAME_PROCESS_SLEEP_INTERVAL)
except asyncio.CancelledError:
logger.info("绰号处理循环已取消。")
break
except Exception as e:
logger.error(f"绰号处理循环出错: {e}\n{traceback.format_exc()}")
await asyncio.sleep(5)
logger.info("绰号处理循环已结束。")
logger.warning(f"(线程 ID: {thread_id}) 从队列接收到意外的项目类型: {type(item)}")
q.task_done()
except queue.Empty: continue
except asyncio.CancelledError: logger.info(f"绰号处理循环已取消 (线程 ID: {thread_id})。"); break
except Exception as e: logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}"); await asyncio.sleep(5)
logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。")
def _run_processor_process(queue: mpQueue, stop_event):
"""进程启动函数,运行异步循环。"""
# --- _run_processor_thread (保持不变,不处理 db 或 config) ---
def _run_processor_thread(q: queue.Queue, stop_event: threading.Event):
"""线程启动函数,运行异步循环。"""
loop = None
thread_id = threading.get_ident()
logger.info(f"Nickname processor thread starting (Thread ID: {thread_id})...")
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_nickname_processing_loop(queue, stop_event))
loop.close()
except Exception as e:
logger.error(f"运行绰号处理器进程时出错: {e}", exc_info=True)
logger.info(f"(Thread ID: {thread_id}) Asyncio event loop created and set.")
loop.run_until_complete(_nickname_processing_loop(q, stop_event))
except Exception as e: logger.error(f"(Thread ID: {thread_id}) Error running nickname processor thread: {e}", exc_info=True)
finally:
if loop:
try:
if loop.is_running():
all_tasks = asyncio.all_tasks(loop)
if all_tasks:
logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} tasks...")
for task in all_tasks: task.cancel()
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
loop.stop()
loop.close()
logger.info(f"(Thread ID: {thread_id}) Asyncio loop closed.")
except Exception as loop_close_err: logger.error(f"(Thread ID: {thread_id}) Error closing loop: {loop_close_err}", exc_info=True)
logger.info(f"Nickname processor thread finished (Thread ID: {thread_id}).")
# --- start_nickname_processor (使用全局 config) ---
def start_nickname_processor():
"""启动绰号映射处理进程。"""
global _nickname_process
if not global_config.ENABLE_NICKNAME_MAPPING:
logger.info("绰号映射功能已禁用。处理器未启动。")
"""启动绰号映射处理线程。"""
global _nickname_thread
# --- 修改:使用全局 config ---
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
# ---------------------------
logger.info("绰号映射功能已禁用或无法获取配置。处理器未启动。")
return
if _nickname_process is None or not _nickname_process.is_alive():
logger.info("正在启动绰号处理器进程...")
if _nickname_thread is None or not _nickname_thread.is_alive():
logger.info("正在启动绰号处理器线程...")
stop_event = get_stop_event()
stop_event.clear()
_nickname_process = Process(target=_run_processor_process, args=(nickname_queue, stop_event), daemon=True)
_nickname_process.start()
logger.info(f"绰号处理器进程已启动PID: {_nickname_process.pid}")
_nickname_thread = threading.Thread(
target=_run_processor_thread,
args=(nickname_queue, stop_event),
daemon=True
)
_nickname_thread.start()
logger.info(f"绰号处理器线程已启动 (Thread ID: {_nickname_thread.ident})")
else:
logger.warning("绰号处理器进程已在运行中。")
logger.warning("绰号处理器线程已在运行中。")
# --- stop_nickname_processor (保持不变) ---
def stop_nickname_processor():
"""停止绰号映射处理程。"""
global _nickname_process
if _nickname_process and _nickname_process.is_alive():
logger.info("正在停止绰号处理器程...")
set_stop_event() # 发送停止信号
"""停止绰号映射处理线程。"""
global _nickname_thread
if _nickname_thread and _nickname_thread.is_alive():
logger.info("正在停止绰号处理器线程...")
set_stop_event()
try:
_nickname_process.join(timeout=10)
if _nickname_process.is_alive():
logger.warning("绰号处理器进程在 10 秒后未优雅停止。正在终止...")
_nickname_process.terminate()
_nickname_process.join(timeout=5)
except Exception as e:
logger.error(f"停止绰号处理器进程时出错: {e}", exc_info=True)
_nickname_thread.join(timeout=10)
if _nickname_thread.is_alive(): logger.warning("绰号处理器线程在 10 秒后未结束。")
except Exception as e: logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
finally:
if _nickname_process and not _nickname_process.is_alive():
logger.info("绰号处理器进程已成功停止。")
else:
logger.error("未能停止绰号处理器进程。")
_nickname_process = None
if _nickname_thread and not _nickname_thread.is_alive(): logger.info("绰号处理器线程已成功停止。")
else: logger.warning("停止绰号处理器线程:线程可能仍在运行。")
_nickname_thread = None
else:
logger.info("绰号处理器进程未在运行")
logger.info("绰号处理器线程未在运行或已被清理。")
# 可以在应用启动时调用 start_nickname_processor()
# 可以在应用关闭时调用 stop_nickname_processor()
def get_stop_event():
# --- Event 控制函数 (保持不变) ---
def get_stop_event() -> threading.Event:
"""获取全局停止事件"""
return _stop_event
def set_stop_event():
"""设置全局停止事件,通知子程退出"""
_stop_event.set()
"""设置全局停止事件,通知子线程退出"""
_stop_event.set()