🤖 自动格式化代码 [skip ci]

pull/914/head
github-actions[bot] 2025-05-03 06:28:48 +00:00
parent 9890d76c43
commit 0b1c3a2196
1 changed files with 55 additions and 41 deletions

View File

@ -19,7 +19,8 @@ from src.plugins.chat.message import MessageRecv
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
logger = get_logger("NicknameManager")
logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger
logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger
def run_async_loop(loop: asyncio.AbstractEventLoop, coro):
"""
@ -44,7 +45,9 @@ def run_async_loop(loop: asyncio.AbstractEventLoop, coro):
# 1. 取消所有剩余任务
all_tasks = asyncio.all_tasks(loop)
current_task = asyncio.current_task(loop)
tasks_to_cancel = [task for task in all_tasks if task is not current_task] # 避免取消 run_until_complete 本身
tasks_to_cancel = [
task for task in all_tasks if task is not current_task
] # 避免取消 run_until_complete 本身
if tasks_to_cancel:
logger_helper.info(f"Cancelling {len(tasks_to_cancel)} outstanding tasks in loop {id(loop)}...")
for task in tasks_to_cancel:
@ -61,17 +64,19 @@ def run_async_loop(loop: asyncio.AbstractEventLoop, coro):
# 3. 关闭循环 (如果未关闭)
if not loop.is_closed():
# 在关闭前再运行一次以处理挂起的关闭回调
loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器
loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器
loop.close()
logger_helper.info(f"Asyncio loop {id(loop)} closed.")
except Exception as close_err:
logger_helper.error(f"Error during asyncio loop cleanup for loop {id(loop)}: {close_err}", exc_info=True)
class NicknameManager:
"""
管理群组绰号分析处理存储和使用的单例类
封装了 LLM 调用后台处理线程和数据库交互
"""
_instance = None
_lock = threading.Lock()
@ -89,11 +94,11 @@ class NicknameManager:
初始化 NicknameManager
使用锁和标志确保实际初始化只执行一次
"""
if hasattr(self, '_initialized') and self._initialized:
if hasattr(self, "_initialized") and self._initialized:
return
with self._lock:
if hasattr(self, '_initialized') and self._initialized:
if hasattr(self, "_initialized") and self._initialized:
return
logger.info("正在初始化 NicknameManager 组件...")
@ -101,7 +106,7 @@ class NicknameManager:
self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING
# 数据库处理器
person_info_collection = getattr(db, 'person_info', None)
person_info_collection = getattr(db, "person_info", None)
self.db_handler = NicknameDB(person_info_collection)
if not self.db_handler.is_available():
logger.error("数据库处理器初始化失败NicknameManager 功能受限。")
@ -136,9 +141,9 @@ class NicknameManager:
self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100)
# 使用 asyncio.Queue
self.nickname_queue: asyncio.Queue = asyncio.Queue(maxsize=self.queue_max_size)
self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event因为它是由另一个线程设置的
self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event因为它是由另一个线程设置的
self._nickname_thread: Optional[threading.Thread] = None
self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间
self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间
self._initialized = True
logger.info("NicknameManager 初始化完成。")
@ -152,8 +157,8 @@ class NicknameManager:
logger.info("正在启动绰号处理器线程...")
self._stop_event.clear()
self._nickname_thread = threading.Thread(
target=self._run_processor_in_thread, # 线程目标函数不变
daemon=True
target=self._run_processor_in_thread, # 线程目标函数不变
daemon=True,
)
self._nickname_thread.start()
logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})")
@ -164,10 +169,10 @@ class NicknameManager:
"""停止后台处理线程。"""
if self._nickname_thread and self._nickname_thread.is_alive():
logger.info("正在停止绰号处理器线程...")
self._stop_event.set() # 设置停止事件_processing_loop 会检测到
self._stop_event.set() # 设置停止事件_processing_loop 会检测到
try:
# 不需要清空 asyncio.Queue让循环自然结束或被取消
self._nickname_thread.join(timeout=10) # 等待线程结束
self._nickname_thread.join(timeout=10) # 等待线程结束
if self._nickname_thread.is_alive():
logger.warning("绰号处理器线程在超时后仍未停止。")
except Exception as e:
@ -209,8 +214,11 @@ class NicknameManager:
# 格式化历史记录
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True, merge_messages=False, timestamp_mode="relative",
read_mark=0.0, truncate=False,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
truncate=False,
)
# 2. 获取 Bot 回复
bot_reply_str = " ".join(bot_reply) if bot_reply else ""
@ -218,7 +226,9 @@ class NicknameManager:
group_id = str(current_chat_stream.group_info.group_id)
platform = current_chat_stream.platform
# 4. 构建用户 ID 到名称的映射 (user_name_map)
user_ids_in_history = {str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")}
user_ids_in_history = {
str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")
}
user_name_map = {}
if user_ids_in_history:
try:
@ -231,14 +241,15 @@ class NicknameManager:
user_name_map[user_id] = names_data[user_id]
else:
latest_nickname = next(
(m["user_info"].get("user_nickname")
for m in reversed(history_messages)
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")),
(
m["user_info"].get("user_nickname")
for m in reversed(history_messages)
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")
),
None,
)
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map)
await self._add_to_queue(item, platform, group_id)
@ -256,7 +267,11 @@ class NicknameManager:
try:
group_id = str(chat_stream.group_info.group_id)
platform = chat_stream.platform
user_ids_in_context = {str(msg["user_info"]["user_id"]) for msg in message_list_before_now if msg.get("user_info", {}).get("user_id")}
user_ids_in_context = {
str(msg["user_info"]["user_id"])
for msg in message_list_before_now
if msg.get("user_info", {}).get("user_id")
}
if not user_ids_in_context:
recent_speakers = chat_stream.get_recent_speakers(limit=5)
@ -283,7 +298,6 @@ class NicknameManager:
logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True)
return ""
# 私有/内部方法
async def _add_to_queue(self, item: tuple, platform: str, group_id: str):
@ -291,15 +305,18 @@ class NicknameManager:
try:
# 使用 await put(),如果队列满则异步等待
await self.nickname_queue.put(item)
logger.debug(f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}")
logger.debug(
f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}"
)
except asyncio.QueueFull:
# 理论上 await put() 不会直接抛 QueueFull除非 maxsize=0
# 但保留以防万一或未来修改
logger.warning(f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。")
logger.warning(
f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。"
)
except Exception as e:
logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True)
async def _analyze_and_update_nicknames(self, item: tuple):
"""处理单个队列项目:调用 LLM 分析并更新数据库。"""
if not isinstance(item, tuple) or len(item) != 5:
@ -309,7 +326,7 @@ class NicknameManager:
chat_history_str, bot_reply, platform, group_id, user_name_map = item
# 使用 asyncio.get_running_loop().call_soon(threading.get_ident) 可能不准确线程ID是同步概念
# 可以考虑移除线程ID日志或寻找异步安全的获取标识符的方式
log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀
log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀
logger.debug(f"{log_prefix} 开始处理绰号分析任务...")
if not self.llm_mapper:
@ -340,7 +357,9 @@ class NicknameManager:
try:
person_id = person_info_manager.get_person_id(platform, user_id_str)
if not person_id:
logger.error(f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。")
logger.error(
f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。"
)
continue
self.db_handler.upsert_person(person_id, user_id_int, platform)
self.db_handler.update_group_nickname_count(person_id, group_id, nickname)
@ -353,7 +372,6 @@ class NicknameManager:
else:
logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。")
async def _call_llm_for_analysis(
self,
chat_history_str: str,
@ -387,7 +405,7 @@ class NicknameManager:
elif response_content.startswith("{") and response_content.endswith("}"):
pass # 可能是纯 JSON
else:
json_match = re.search(r'\{.*\}', response_content, re.DOTALL)
json_match = re.search(r"\{.*\}", response_content, re.DOTALL)
if json_match:
response_content = json_match.group(0)
else:
@ -430,11 +448,10 @@ class NicknameManager:
logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True)
return {"is_exist": False}
def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]:
"""过滤 LLM 返回的绰号映射结果。"""
filtered_data = {}
bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, 'BOT_QQ') else None
bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, "BOT_QQ") else None
for user_id, nickname in original_data.items():
if not isinstance(user_id, str):
@ -454,23 +471,22 @@ class NicknameManager:
return filtered_data
# 线程相关
# 修改:使用 run_async_loop 辅助函数
def _run_processor_in_thread(self):
"""后台线程入口函数,使用辅助函数管理 asyncio 事件循环。"""
thread_id = threading.get_ident() # 获取线程ID用于日志
thread_id = threading.get_ident() # 获取线程ID用于日志
logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # 为当前线程设置事件循环
asyncio.set_event_loop(loop) # 为当前线程设置事件循环
logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。")
# 调用辅助函数来运行主处理协程并管理循环生命周期
run_async_loop(loop, self._processing_loop())
logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).")
# 结束修改
# 结束修改
# 修改:使用 asyncio.Queue 和 wait_for
async def _processing_loop(self):
@ -478,18 +494,15 @@ class NicknameManager:
# 移除线程ID日志因为它在异步上下文中不一定准确
logger.info("绰号异步处理循环已启动。")
while not self._stop_event.is_set(): # 仍然检查同步的停止事件
while not self._stop_event.is_set(): # 仍然检查同步的停止事件
try:
# 使用 asyncio.wait_for 从异步队列获取项目,并设置超时
item = await asyncio.wait_for(
self.nickname_queue.get(),
timeout=self.sleep_interval
)
item = await asyncio.wait_for(self.nickname_queue.get(), timeout=self.sleep_interval)
# 处理获取到的项目 (调用异步方法)
await self._analyze_and_update_nicknames(item)
self.nickname_queue.task_done() # 标记任务完成
self.nickname_queue.task_done() # 标记任务完成
except asyncio.TimeoutError:
# 等待超时,相当于之前 queue.Empty继续循环检查停止事件
@ -497,7 +510,7 @@ class NicknameManager:
except asyncio.CancelledError:
# 协程被取消 (通常在 stop_processor 中发生)
logger.info("绰号处理循环被取消。")
break # 退出循环
break # 退出循环
except Exception as e:
# 捕获处理单个项目时可能发生的其他异常
logger.error(f"绰号处理循环出错: {e}", exc_info=True)
@ -507,6 +520,7 @@ class NicknameManager:
logger.info("绰号异步处理循环已结束。")
# 可以在这里添加清理逻辑,比如确保队列为空或处理剩余项目
# 例如await self.nickname_queue.join() # 等待所有任务完成 (如果需要)
# 结束修改