mirror of https://github.com/Mai-with-u/MaiBot.git
过滤自身与无变更
parent
f645d350c1
commit
8bf7095800
|
|
@ -29,12 +29,14 @@ if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
|
|||
llm_mapper = None
|
||||
|
||||
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
|
||||
"""构建用于 LLM 绰号映射的 Prompt"""
|
||||
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
|
||||
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
|
||||
print(f"\n\n\n{user_list_str}\n\n\n\n")
|
||||
# print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print
|
||||
prompt = f"""
|
||||
任务:分析以下聊天记录和你的最新回复,判断其中是否包含用户绰号,并确定绰号与用户 ID 之间是否存在明确的一一对应关系。
|
||||
|
||||
已知用户信息:
|
||||
已知用户信息(ID: 名称):
|
||||
{user_list_str}
|
||||
|
||||
聊天记录:
|
||||
|
|
@ -56,12 +58,12 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
|
|||
"用户B数字id": "绰号_B"
|
||||
}}
|
||||
}}
|
||||
其中 "data" 字段的键是用户的 ID,值是对应的绰号。只包含你能确认映射关系的绰号。
|
||||
其中 "data" 字段的键是用户的 ID (字符串形式),值是对应的绰号。只包含你能确认映射关系的绰号。
|
||||
4. 如果无法建立任何可靠的一一映射关系(例如,绰号指代不明、没有出现绰号、或无法确认绰号与用户的关联),请输出 JSON 对象:
|
||||
{{
|
||||
"is_exist": false
|
||||
}}
|
||||
5. 你的昵称后面包含"(你)",不需要输出你自身的绰号。
|
||||
5. 在“已知用户信息”列表中,你的昵称后面可能包含"(你)",这表示是你自己,不需要输出你自身的绰号映射。请确保不要将你自己的ID和任何词语映射为绰号。
|
||||
6. 请严格按照 JSON 格式输出,不要包含任何额外的解释或文本。
|
||||
|
||||
输出:
|
||||
|
|
@ -72,14 +74,12 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
|
|||
async def analyze_chat_for_nicknames(
|
||||
chat_history_str: str,
|
||||
bot_reply: str,
|
||||
user_name_map: Dict[str, str]
|
||||
user_name_map: Dict[str, str] # 这个 map 包含了 user_id -> person_name 的信息
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射。
|
||||
调用 LLM 分析聊天记录和 Bot 回复,提取可靠的 用户ID-绰号 映射,并进行过滤。
|
||||
"""
|
||||
# --- [修改] 使用全局配置开关 ---
|
||||
if not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
# --- 结束修改 ---
|
||||
logger.debug("绰号映射功能已禁用。")
|
||||
return {"is_exist": False}
|
||||
|
||||
|
|
@ -99,6 +99,7 @@ async def analyze_chat_for_nicknames(
|
|||
logger.warning("LLM 返回了空的绰号映射内容。")
|
||||
return {"is_exist": False}
|
||||
|
||||
# 清理可能的 Markdown 代码块标记
|
||||
response_content = response_content.strip()
|
||||
if response_content.startswith("```json"):
|
||||
response_content = response_content[7:]
|
||||
|
|
@ -110,14 +111,51 @@ async def analyze_chat_for_nicknames(
|
|||
result = json.loads(response_content)
|
||||
if isinstance(result, dict) and "is_exist" in result:
|
||||
if result["is_exist"] is True:
|
||||
if "data" in result and isinstance(result["data"], dict):
|
||||
if not result["data"]:
|
||||
logger.debug("LLM 指示 is_exist=True 但 data 为空。视为 False 处理。")
|
||||
original_data = result.get("data") # 使用 .get() 更安全
|
||||
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
|
||||
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
|
||||
|
||||
# --- 开始过滤 ---
|
||||
filtered_data = {}
|
||||
bot_qq_str = str(global_config.BOT_QQ) # 将机器人QQ转为字符串以便比较
|
||||
|
||||
for user_id, nickname in original_data.items():
|
||||
# 检查 user_id 是否是字符串,以防万一
|
||||
if not isinstance(user_id, str):
|
||||
logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。")
|
||||
continue
|
||||
|
||||
# 条件 1: 排除机器人自身
|
||||
if user_id == bot_qq_str:
|
||||
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
|
||||
continue
|
||||
|
||||
# 条件 2: 排除 nickname 与 person_name 相同的情况
|
||||
person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
|
||||
if person_name and person_name == nickname:
|
||||
logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
|
||||
continue
|
||||
|
||||
# 如果通过所有过滤条件,则保留
|
||||
filtered_data[user_id] = nickname
|
||||
# --- 结束过滤 ---
|
||||
|
||||
# 检查过滤后是否还有数据
|
||||
if not filtered_data:
|
||||
logger.info("所有找到的绰号映射都被过滤掉了。")
|
||||
return {"is_exist": False}
|
||||
logger.info(f"找到绰号映射: {result['data']}")
|
||||
return {"is_exist": True, "data": result["data"]}
|
||||
else:
|
||||
logger.info(f"过滤后的绰号映射: {filtered_data}")
|
||||
return {"is_exist": True, "data": filtered_data} # 返回过滤后的数据
|
||||
|
||||
else:
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 缺失或不是字典。")
|
||||
# is_exist 为 True 但 data 缺失、不是字典或为空
|
||||
if "data" not in result:
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
|
||||
elif not isinstance(result.get("data"), dict):
|
||||
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。")
|
||||
else: # data 为空字典
|
||||
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
|
||||
return {"is_exist": False}
|
||||
elif result["is_exist"] is False:
|
||||
logger.info("LLM 未找到可靠的绰号映射。")
|
||||
|
|
@ -135,4 +173,3 @@ async def analyze_chat_for_nicknames(
|
|||
except Exception as e:
|
||||
logger.error(f"绰号映射 LLM 调用或处理过程中出错: {e}", exc_info=True)
|
||||
return {"is_exist": False}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# nickname_processor.py (多线程版本 - 使用全局 config)
|
||||
# nickname_processor.py (多线程版本 - 使用全局 config - 修复 Race Condition on person_id)
|
||||
|
||||
import asyncio
|
||||
import traceback
|
||||
|
|
@ -8,7 +8,7 @@ from typing import Dict, Optional, Any
|
|||
|
||||
# 数据库和日志导入
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import OperationFailure
|
||||
from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.common.database import db # 使用全局 db
|
||||
|
||||
|
|
@ -19,7 +19,12 @@ try:
|
|||
from src.config.config import global_config # <--- 直接导入全局配置
|
||||
except ImportError:
|
||||
logger.critical("无法导入 global_config!")
|
||||
global_config = None # 设置为 None
|
||||
# 提供一个默认的回退配置对象,如果 global_config 导入失败
|
||||
class FallbackConfig:
|
||||
ENABLE_NICKNAME_MAPPING = False
|
||||
NICKNAME_QUEUE_MAX_SIZE = 100
|
||||
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5
|
||||
global_config = FallbackConfig()
|
||||
# ---------------------------
|
||||
|
||||
# 绰号分析函数导入
|
||||
|
|
@ -29,89 +34,206 @@ from .nickname_mapper import analyze_chat_for_nicknames
|
|||
_stop_event = threading.Event()
|
||||
# --------------------------
|
||||
|
||||
# --- 数据库更新逻辑 (使用全局 db) ---
|
||||
async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
|
||||
"""更新数据库中用户的群组绰号计数 (使用全局 db)"""
|
||||
# --- 数据库更新逻辑 (使用全局 db) - 修复 Race Condition 版 ---
|
||||
async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]):
|
||||
"""
|
||||
更新数据库中用户的群组绰号计数 (使用全局 db)。
|
||||
通过首先基于 person_id 进行 upsert 来处理潜在的 race condition。
|
||||
|
||||
Args:
|
||||
platform (str): 平台名称 (e.g., 'qq')。
|
||||
group_id (str): 群组 ID。
|
||||
nickname_map (Dict[str, str]): 用户 ID (字符串) 到绰号的映射。
|
||||
"""
|
||||
# 尝试导入 person_info_manager (放在函数内部以减少潜在的导入问题)
|
||||
try:
|
||||
# 假设 person_info 在 group_nickname 的上一级目录
|
||||
from ..person_info.person_info import person_info_manager
|
||||
except ImportError:
|
||||
logger.error("无法导入 person_info_manager,无法生成 person_id!")
|
||||
return # 无法继续,因为需要 person_id
|
||||
|
||||
person_info_collection = db.person_info
|
||||
# ... (函数体保持不变, 参考之前的版本) ...
|
||||
if not nickname_map: logger.debug("提供的用于更新的绰号映射为空。"); return
|
||||
logger.info(f"尝试更新群组 '{group_id}' 的绰号计数,映射为: {nickname_map}")
|
||||
|
||||
if not nickname_map:
|
||||
logger.debug("提供的用于更新的绰号映射为空。")
|
||||
return
|
||||
|
||||
logger.info(f"尝试更新平台 '{platform}' 群组 '{group_id}' 的绰号计数,映射为: {nickname_map}")
|
||||
|
||||
for user_id_str, nickname in nickname_map.items():
|
||||
if not user_id_str or not nickname: logger.warning(f"跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'"); continue
|
||||
# --- 基本验证 ---
|
||||
if not user_id_str or not nickname:
|
||||
logger.warning(f"跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'")
|
||||
continue
|
||||
group_id_str = str(group_id)
|
||||
try: user_id_int = int(user_id_str)
|
||||
except ValueError: logger.warning(f"无效的用户ID格式: '{user_id_str}',跳过。"); continue
|
||||
try:
|
||||
person_info_collection.update_one({"user_id": user_id_int},{"$setOnInsert": {"user_id": user_id_int}}, upsert=True)
|
||||
person_info_collection.update_one({"user_id": user_id_int, "group_nicknames": {"$exists": False}}, {"$set": {"group_nicknames": []}})
|
||||
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}}}, {"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, array_filters=[{"group.group_id": group_id_str}, {"nick.name": nickname}])
|
||||
if update_result.modified_count > 0: continue
|
||||
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames.group_id": group_id_str}, {"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}}, array_filters=[{"group.group_id": group_id_str}])
|
||||
if update_result.modified_count > 0: continue
|
||||
update_result = person_info_collection.update_one({"user_id": user_id_int, "group_nicknames.group_id": {"$ne": group_id_str}}, {"$push": {"group_nicknames": {"group_id": group_id_str, "nicknames": [{"name": nickname, "count": 1}]}}})
|
||||
except OperationFailure as op_err: logger.exception(f"数据库操作失败: 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}")
|
||||
except Exception as e: logger.exception(f"更新用户 {user_id_str} 的绰号 {nickname} 时发生意外错误")
|
||||
user_id_int = int(user_id_str)
|
||||
except ValueError:
|
||||
logger.warning(f"无效的用户ID格式: '{user_id_str}',跳过。")
|
||||
continue
|
||||
# --- 结束验证 ---
|
||||
|
||||
try:
|
||||
# --- 步骤 1: 生成 person_id ---
|
||||
person_id = person_info_manager.get_person_id(platform, user_id_str)
|
||||
if not person_id:
|
||||
logger.error(f"无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id,跳过此用户。")
|
||||
continue
|
||||
|
||||
# --- 步骤 2: 基于 person_id 执行 Upsert ---
|
||||
# 这是关键步骤,用于原子性地确保文档存在,避免 person_id 冲突。
|
||||
# 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。
|
||||
# 如果文档已存在,此操作不会修改任何内容(因为没有 $set 操作符)。
|
||||
upsert_result = person_info_collection.update_one(
|
||||
{"person_id": person_id}, # Filter by the unique key
|
||||
{
|
||||
"$setOnInsert": {
|
||||
"person_id": person_id,
|
||||
"user_id": user_id_int,
|
||||
"platform": platform,
|
||||
"group_nicknames": [] # 初始化 group_nicknames 数组
|
||||
}
|
||||
},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
# 可选日志:记录是否创建了新文档
|
||||
if upsert_result.upserted_id:
|
||||
logger.debug(f"Upsert on person_id created new document: {person_id}")
|
||||
# else:
|
||||
# logger.debug(f"Upsert on person_id found existing document: {person_id}")
|
||||
|
||||
# --- 步骤 3: 更新群组绰号 ---
|
||||
# 现在我们确信具有此 person_id 的文档存在,可以安全地更新其 group_nicknames。
|
||||
|
||||
# 3a. 尝试增加现有群组中现有绰号的计数
|
||||
update_result_inc = person_info_collection.update_one(
|
||||
{
|
||||
"person_id": person_id, # 明确目标文档
|
||||
"group_nicknames": { # 检查数组中是否有匹配项
|
||||
"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}
|
||||
}
|
||||
},
|
||||
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}}, # 增加计数
|
||||
array_filters=[ # 指定要更新的数组元素
|
||||
{"group.group_id": group_id_str},
|
||||
{"nick.name": nickname}
|
||||
]
|
||||
)
|
||||
|
||||
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
|
||||
if update_result_inc.modified_count == 0:
|
||||
update_result_push_nick = person_info_collection.update_one(
|
||||
{
|
||||
"person_id": person_id, # 明确目标文档
|
||||
"group_nicknames.group_id": group_id_str # 检查群组是否存在
|
||||
},
|
||||
# 将新绰号添加到匹配群组的 nicknames 数组中
|
||||
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
|
||||
array_filters=[{"group.group_id": group_id_str}] # 指定要推送到的群组
|
||||
)
|
||||
|
||||
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
|
||||
if update_result_push_nick.modified_count == 0:
|
||||
# 确保 group_nicknames 数组存在 (如果 $setOnInsert 失败或数据不一致时的保险措施)
|
||||
person_info_collection.update_one(
|
||||
{"person_id": person_id, "group_nicknames": {"$exists": False}},
|
||||
{"$set": {"group_nicknames": []}}
|
||||
)
|
||||
# 推送新的群组对象到 group_nicknames 数组
|
||||
update_result_push_group = person_info_collection.update_one(
|
||||
{
|
||||
"person_id": person_id, # 明确目标文档
|
||||
"group_nicknames.group_id": {"$ne": group_id_str} # 确保该群组 ID 尚未存在
|
||||
},
|
||||
{
|
||||
"$push": { # 添加新的群组条目
|
||||
"group_nicknames": {
|
||||
"group_id": group_id_str,
|
||||
"nicknames": [{"name": nickname, "count": 1}] # 初始化绰号列表
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
if update_result_push_group.modified_count > 0:
|
||||
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
|
||||
|
||||
except DuplicateKeyError as dk_err:
|
||||
# 这个错误理论上不应该再由步骤 2 的 upsert 触发。
|
||||
# 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。
|
||||
logger.error(f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。")
|
||||
except OperationFailure as op_err:
|
||||
logger.exception(f"数据库操作失败 (OperationFailure): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}")
|
||||
except Exception as e:
|
||||
logger.exception(f"更新用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误")
|
||||
|
||||
|
||||
# --- 使用 queue.Queue ---
|
||||
# --- 修改:直接使用 global_config ---
|
||||
queue_max_size = getattr(global_config, 'NICKNAME_QUEUE_MAX_SIZE', 100) if global_config else 100
|
||||
# --------------------------------
|
||||
queue_max_size = getattr(global_config, 'NICKNAME_QUEUE_MAX_SIZE', 100)
|
||||
nickname_queue: queue.Queue = queue.Queue(maxsize=queue_max_size)
|
||||
# ----------------------
|
||||
|
||||
_nickname_thread: Optional[threading.Thread] = None
|
||||
|
||||
# --- add_to_nickname_queue (使用全局 config) ---
|
||||
# --- add_to_nickname_queue (保持不变,已包含 platform) ---
|
||||
async def add_to_nickname_queue(
|
||||
chat_history_str: str,
|
||||
bot_reply: str,
|
||||
platform: str,
|
||||
group_id: Optional[str],
|
||||
user_name_map: Dict[str, str]
|
||||
):
|
||||
"""将需要分析的数据放入队列。"""
|
||||
# --- 修改:使用全局 config ---
|
||||
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
# ---------------------------
|
||||
return
|
||||
if group_id is None: logger.debug("私聊跳过绰号映射。"); return
|
||||
if group_id is None:
|
||||
logger.debug("私聊跳过绰号映射。")
|
||||
return
|
||||
try:
|
||||
item = (chat_history_str, bot_reply, str(group_id), user_name_map)
|
||||
item = (chat_history_str, bot_reply, platform, str(group_id), user_name_map)
|
||||
nickname_queue.put_nowait(item)
|
||||
logger.debug(f"已将项目添加到群组 {group_id} 的绰号队列。当前大小: {nickname_queue.qsize()}")
|
||||
except queue.Full: logger.warning(f"无法将项目添加到绰号队列:队列已满 (maxsize={nickname_queue.maxsize})。")
|
||||
except Exception as e: logger.warning(f"无法将项目添加到绰号队列: {e}", exc_info=True)
|
||||
logger.debug(f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {nickname_queue.qsize()}")
|
||||
except queue.Full:
|
||||
logger.warning(f"无法将项目添加到绰号队列:队列已满 (maxsize={nickname_queue.maxsize})。")
|
||||
except Exception as e:
|
||||
logger.warning(f"无法将项目添加到绰号队列: {e}", exc_info=True)
|
||||
|
||||
|
||||
# --- _nickname_processing_loop (使用全局 config) ---
|
||||
# --- _nickname_processing_loop (保持不变,已包含 platform) ---
|
||||
async def _nickname_processing_loop(q: queue.Queue, stop_event: threading.Event):
|
||||
"""独立线程中的主循环,处理队列任务 (使用全局 db 和 config)。"""
|
||||
thread_id = threading.get_ident()
|
||||
logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。")
|
||||
# --- 修改:使用全局 config ---
|
||||
sleep_interval = getattr(global_config, 'NICKNAME_PROCESS_SLEEP_INTERVAL', 0.5) if global_config else 0.5
|
||||
# ---------------------------
|
||||
sleep_interval = getattr(global_config, 'NICKNAME_PROCESS_SLEEP_INTERVAL', 0.5)
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
item = q.get(block=True, timeout=sleep_interval)
|
||||
if isinstance(item, tuple) and len(item) == 4:
|
||||
chat_history_str, bot_reply, group_id, user_name_map = item
|
||||
logger.debug(f"(线程 ID: {thread_id}) 正在处理群组 {group_id} 的绰号映射任务...")
|
||||
# analyze_chat_for_nicknames 内部也应使用 global_config
|
||||
|
||||
if isinstance(item, tuple) and len(item) == 5:
|
||||
chat_history_str, bot_reply, platform, group_id, user_name_map = item
|
||||
logger.debug(f"(线程 ID: {thread_id}) 正在处理平台 '{platform}' 群组 '{group_id}' 的绰号映射任务...")
|
||||
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
|
||||
if analysis_result.get("is_exist") and analysis_result.get("data"):
|
||||
await update_nickname_counts(group_id, analysis_result["data"])
|
||||
await update_nickname_counts(platform, group_id, analysis_result["data"])
|
||||
else:
|
||||
logger.warning(f"(线程 ID: {thread_id}) 从队列接收到意外的项目类型: {type(item)}")
|
||||
logger.warning(f"(线程 ID: {thread_id}) 从队列接收到意外的项目类型或长度: {type(item)}, 内容: {item}")
|
||||
|
||||
q.task_done()
|
||||
except queue.Empty: continue
|
||||
except asyncio.CancelledError: logger.info(f"绰号处理循环已取消 (线程 ID: {thread_id})。"); break
|
||||
except Exception as e: logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}"); await asyncio.sleep(5)
|
||||
|
||||
except queue.Empty:
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"绰号处理循环已取消 (线程 ID: {thread_id})。")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。")
|
||||
|
||||
|
||||
# --- _run_processor_thread (保持不变,不处理 db 或 config) ---
|
||||
# --- _run_processor_thread (保持不变) ---
|
||||
def _run_processor_thread(q: queue.Queue, stop_event: threading.Event):
|
||||
"""线程启动函数,运行异步循环。"""
|
||||
loop = None
|
||||
|
|
@ -122,30 +244,34 @@ def _run_processor_thread(q: queue.Queue, stop_event: threading.Event):
|
|||
asyncio.set_event_loop(loop)
|
||||
logger.info(f"(Thread ID: {thread_id}) Asyncio event loop created and set.")
|
||||
loop.run_until_complete(_nickname_processing_loop(q, stop_event))
|
||||
except Exception as e: logger.error(f"(Thread ID: {thread_id}) Error running nickname processor thread: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
logger.error(f"(Thread ID: {thread_id}) Error running nickname processor thread: {e}", exc_info=True)
|
||||
finally:
|
||||
if loop:
|
||||
try:
|
||||
if loop.is_running():
|
||||
logger.info(f"(Thread ID: {thread_id}) Stopping the asyncio loop...")
|
||||
all_tasks = asyncio.all_tasks(loop)
|
||||
if all_tasks:
|
||||
logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} tasks...")
|
||||
logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} running tasks...")
|
||||
for task in all_tasks: task.cancel()
|
||||
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
|
||||
logger.info(f"(Thread ID: {thread_id}) All tasks cancelled.")
|
||||
loop.stop()
|
||||
loop.close()
|
||||
logger.info(f"(Thread ID: {thread_id}) Asyncio loop closed.")
|
||||
except Exception as loop_close_err: logger.error(f"(Thread ID: {thread_id}) Error closing loop: {loop_close_err}", exc_info=True)
|
||||
logger.info(f"(Thread ID: {thread_id}) Loop stopped.")
|
||||
if not loop.is_closed():
|
||||
loop.close()
|
||||
logger.info(f"(Thread ID: {thread_id}) Asyncio loop closed.")
|
||||
except Exception as loop_close_err:
|
||||
logger.error(f"(Thread ID: {thread_id}) Error closing loop: {loop_close_err}", exc_info=True)
|
||||
logger.info(f"Nickname processor thread finished (Thread ID: {thread_id}).")
|
||||
|
||||
|
||||
# --- start_nickname_processor (使用全局 config) ---
|
||||
# --- start_nickname_processor (保持不变) ---
|
||||
def start_nickname_processor():
|
||||
"""启动绰号映射处理线程。"""
|
||||
global _nickname_thread
|
||||
# --- 修改:使用全局 config ---
|
||||
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
|
||||
# ---------------------------
|
||||
logger.info("绰号映射功能已禁用或无法获取配置。处理器未启动。")
|
||||
return
|
||||
|
||||
|
|
@ -172,11 +298,15 @@ def stop_nickname_processor():
|
|||
set_stop_event()
|
||||
try:
|
||||
_nickname_thread.join(timeout=10)
|
||||
if _nickname_thread.is_alive(): logger.warning("绰号处理器线程在 10 秒后未结束。")
|
||||
except Exception as e: logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
|
||||
if _nickname_thread.is_alive():
|
||||
logger.warning("绰号处理器线程在 10 秒后未结束。")
|
||||
except Exception as e:
|
||||
logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
|
||||
finally:
|
||||
if _nickname_thread and not _nickname_thread.is_alive(): logger.info("绰号处理器线程已成功停止。")
|
||||
else: logger.warning("停止绰号处理器线程:线程可能仍在运行。")
|
||||
if _nickname_thread and not _nickname_thread.is_alive():
|
||||
logger.info("绰号处理器线程已成功停止。")
|
||||
else:
|
||||
logger.warning("停止绰号处理器线程:线程可能仍在运行或未正确清理。")
|
||||
_nickname_thread = None
|
||||
else:
|
||||
logger.info("绰号处理器线程未在运行或已被清理。")
|
||||
|
|
|
|||
|
|
@ -764,7 +764,7 @@ class HeartFChatting:
|
|||
user_name_map[user_id] = f"未知({user_id})"
|
||||
|
||||
# 5. 添加到队列
|
||||
await add_to_nickname_queue(chat_history_str, bot_reply_str, group_id, user_name_map)
|
||||
await add_to_nickname_queue(chat_history_str, bot_reply_str,platform, group_id, user_name_map)
|
||||
logger.debug(f"{self.log_prefix} Triggered nickname analysis for group {group_id}.")
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -99,11 +99,25 @@ class RelationshipManager:
|
|||
{"_id": 0, "person_id": 1, "user_id": 1, "person_name": 1} # 只查询需要的字段
|
||||
)
|
||||
|
||||
for doc in cursor:
|
||||
original_user_id = doc.get("user_id", "").split("_", 1)[-1]
|
||||
for doc in cursor:
|
||||
user_id_val = doc.get("user_id") # 获取原始值
|
||||
original_user_id = None # 初始化
|
||||
|
||||
if isinstance(user_id_val, (int, float)): # 检查是否是数字类型
|
||||
original_user_id = str(user_id_val) # 直接转换为字符串
|
||||
elif isinstance(user_id_val, str): # 检查是否是字符串
|
||||
if "_" in user_id_val: # 如果包含下划线,则分割
|
||||
original_user_id = user_id_val.split("_", 1)[-1]
|
||||
else: # 如果不包含下划线,则直接使用该字符串
|
||||
original_user_id = user_id_val
|
||||
# else: # 其他类型或 None,original_user_id 保持为 None
|
||||
|
||||
person_name = doc.get("person_name")
|
||||
|
||||
# 确保 original_user_id 和 person_name 都有效
|
||||
if original_user_id and person_name:
|
||||
names_map[original_user_id] = person_name
|
||||
|
||||
logger.debug(f"批量获取 {len(user_ids)} 个用户的 person_name,找到 {len(names_map)} 个。")
|
||||
except AttributeError as e:
|
||||
# 如果 db 对象没有 person_info 属性,或者 find 方法不存在
|
||||
|
|
@ -111,7 +125,6 @@ class RelationshipManager:
|
|||
except Exception as e:
|
||||
logger.error(f"批量获取 person_name 时出错: {e}", exc_info=True)
|
||||
return names_map
|
||||
# --- 结束修改 ---
|
||||
|
||||
@staticmethod
|
||||
async def get_users_group_nicknames(platform: str, user_ids: List[str], group_id: str) -> Dict[str, List[Dict[str, int]]]:
|
||||
|
|
|
|||
Loading…
Reference in New Issue