重构部分代码以使用class封装,调成默认休眠间隔为1分钟,调整线程启动顺序

pull/914/head
Bakadax 2025-05-03 14:03:13 +08:00
parent 56ff5fb923
commit 612d4b1a7e
12 changed files with 825 additions and 752 deletions

27
bot.py
View File

@ -14,10 +14,7 @@ from src.common.logger_manager import get_logger
from src.common.crash_logger import install_crash_handler
from src.main import MainSystem
from rich.traceback import install
from src.plugins.group_nickname.nickname_processor import (
start_nickname_processor,
stop_nickname_processor,
)
from src.plugins.group_nickname.nickname_manager import nickname_manager
import atexit
install(extra_lines=3)
@ -226,6 +223,19 @@ def raw_main():
env_config = {key: os.getenv(key) for key in os.environ}
scan_provider(env_config)
# 确保 NicknameManager 单例实例存在并已初始化
# (单例模式下,导入时或第一次调用时会自动初始化)
_ = nickname_manager # 显式引用一次
# 启动 NicknameManager 的后台处理器线程
logger.info("准备启动绰号处理管理器...")
nickname_manager.start_processor() # 调用实例的方法
logger.info("已调用启动绰号处理管理器。")
# 注册 NicknameManager 的停止方法到 atexit确保程序退出时线程能被清理
atexit.register(nickname_manager.stop_processor) # 注册实例的方法
logger.info("已注册绰号处理管理器的退出处理程序。")
# 返回MainSystem实例
return MainSystem()
@ -235,15 +245,6 @@ if __name__ == "__main__":
# 获取MainSystem实例
main_system = raw_main()
# 在这里启动绰号处理进程
logger.info("准备启动绰号处理线程...")
start_nickname_processor() # <--- 添加启动调用
logger.info("已调用启动绰号处理线程。")
# 注册退出处理函数 (确保进程能被关闭)
atexit.register(stop_nickname_processor) # <--- 在这里注册停止函数
logger.info("已注册绰号处理线程的退出处理程序。")
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

View File

@ -279,7 +279,7 @@ class BotConfig:
MAX_NICKNAMES_IN_PROMPT: int = 10 # Prompt 中最多注入的绰号数量
NICKNAME_PROBABILITY_SMOOTHING: int = 1 # 绰号加权随机选择的平滑因子
NICKNAME_QUEUE_MAX_SIZE: int = 100 # 绰号处理队列最大容量
NICKNAME_PROCESS_SLEEP_INTERVAL: float = 5 # 绰号处理进程休眠间隔(秒)
NICKNAME_PROCESS_SLEEP_INTERVAL: float = 60 # 绰号处理进程休眠间隔(秒)
# 模型配置
llm_reasoning: dict[str, str] = field(default_factory=lambda: {})

View File

@ -0,0 +1,156 @@
from pymongo.collection import Collection
from pymongo.errors import OperationFailure, DuplicateKeyError
from src.common.logger_manager import get_logger
from typing import Optional
logger = get_logger("nickname_db")
class NicknameDB:
"""
处理与群组绰号相关的数据库操作 (MongoDB)
封装了对 'person_info' 集合的读写操作
"""
def __init__(self, person_info_collection: Optional[Collection]):
"""
初始化 NicknameDB 处理器
Args:
person_info_collection: MongoDB 'person_info' 集合对象
如果为 None则数据库操作将被禁用
"""
if person_info_collection is None:
logger.error("未提供 person_info 集合NicknameDB 操作将被禁用。")
self.person_info_collection = None
else:
self.person_info_collection = person_info_collection
logger.info("NicknameDB 初始化成功。")
def is_available(self) -> bool:
"""检查数据库集合是否可用。"""
return self.person_info_collection is not None
def upsert_person(self, person_id: str, user_id_int: int, platform: str):
"""
确保数据库中存在指定 person_id 的文档 (Upsert)
如果文档不存在则使用提供的用户信息创建它
Args:
person_id: 要查找或创建的 person_id
user_id_int: 用户的整数 ID
platform: 平台名称
Returns:
UpdateResult None: MongoDB 更新操作的结果如果数据库不可用则返回 None
Raises:
DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)
Exception: 其他数据库操作错误
"""
if not self.is_available():
logger.error("数据库集合不可用,无法执行 upsert_person。")
return None
try:
# 关键步骤:基于 person_id 执行 Upsert
result = self.person_info_collection.update_one(
{"person_id": person_id},
{
"$setOnInsert": {
"person_id": person_id,
"user_id": user_id_int,
"platform": platform,
"group_nicknames": [], # 初始化 group_nicknames 数组
}
},
upsert=True,
)
if result.upserted_id:
logger.debug(f"Upsert 创建了新的 person 文档: {person_id}")
return result
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由 upsert 触发。
logger.error(
f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。"
)
raise # 将异常向上抛出
except Exception as e:
logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}")
raise # 将异常向上抛出
def update_group_nickname_count(self, person_id: str, group_id_str: str, nickname: str):
"""
尝试更新 person_id 文档中特定群组的绰号计数或添加新条目
按顺序尝试增加计数 -> 添加绰号 -> 添加群组
Args:
person_id: 目标文档的 person_id
group_id_str: 目标群组的 ID (字符串)
nickname: 要更新或添加的绰号
"""
if not self.is_available():
logger.error("数据库集合不可用,无法执行 update_group_nickname_count。")
return
try:
# 3a. 尝试增加现有群组中现有绰号的计数
result_inc = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}},
},
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}},
array_filters=[
{"group.group_id": group_id_str},
{"nick.name": nickname},
],
)
if result_inc.modified_count > 0:
# logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。")
return # 成功增加计数,操作完成
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
result_push_nick = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": group_id_str, # 检查群组是否存在
},
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
array_filters=[{"group.group_id": group_id_str}],
)
if result_push_nick.modified_count > 0:
logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'")
return # 成功添加绰号,操作完成
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
# 确保 group_nicknames 数组存在 (作为保险措施)
self.person_info_collection.update_one(
{"person_id": person_id, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}},
)
# 推送新的群组对象到 group_nicknames 数组
result_push_group = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在
},
{
"$push": {
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}],
}
}
},
)
if result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
# else:
# logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。")
except (OperationFailure, DuplicateKeyError) as db_err:
logger.exception(
f"数据库操作失败 ({type(db_err).__name__}): person_id {person_id}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
# 根据需要决定是否向上抛出 raise db_err
except Exception as e:
logger.exception(f"更新群组绰号计数时发生意外错误: person_id {person_id}, group {group_id_str}, nick {nickname}. Error: {e}")
# 根据需要决定是否向上抛出 raise e

View File

@ -0,0 +1,534 @@
import asyncio
import threading
import queue
import traceback
import time
import json
import re
from typing import Dict, Optional, List, Any
from pymongo.errors import OperationFailure, DuplicateKeyError
from src.common.logger_manager import get_logger
from src.common.database import db
from src.config.config import global_config
from src.plugins.models.utils_model import LLMRequest
from .nickname_db import NicknameDB
from .nickname_mapper import _build_mapping_prompt
from .nickname_utils import select_nicknames_for_prompt, format_nickname_prompt_injection
# 依赖于 person_info_manager 来生成 person_id
from ..person_info.person_info import person_info_manager
# 依赖于 relationship_manager 来获取用户名称和现有绰号
from ..person_info.relationship_manager import relationship_manager
# 导入消息和聊天流相关的类型和工具
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import MessageRecv
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
logger = get_logger("NicknameManager")
class NicknameManager:
"""
管理群组绰号分析处理存储和使用的单例类
封装了 LLM 调用后台处理线程和数据库交互
"""
_instance = None
_lock = threading.Lock()
# Singleton Implementation
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
# 再次检查,防止多线程并发创建实例
if not cls._instance:
logger.info("正在创建 NicknameManager 单例实例...")
cls._instance = super(NicknameManager, cls).__new__(cls)
cls._instance._initialized = False # 添加初始化标志
return cls._instance
def __init__(self):
"""
初始化 NicknameManager
使用锁和标志确保实际初始化只执行一次
"""
if self._initialized: # 如果已初始化,直接返回
return
with self._lock:
# 再次检查初始化标志,防止重复初始化
if self._initialized:
return
logger.info("正在初始化 NicknameManager 组件...")
self.config = global_config
self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING
# 数据库处理器
person_info_collection = getattr(db, 'person_info', None)
self.db_handler = NicknameDB(person_info_collection)
if not self.db_handler.is_available():
logger.error("数据库处理器初始化失败NicknameManager 功能受限。")
self.is_enabled = False # 如果数据库不可用,禁用功能
# LLM 映射器
self.llm_mapper: Optional[LLMRequest] = None
if self.is_enabled:
try:
model_config = self.config.llm_nickname_mapping
if model_config and model_config.get("name"):
self.llm_mapper = LLMRequest(
model=model_config,
temperature=model_config.get("temp", 0.5), # 使用 get 获取并提供默认值
max_tokens=model_config.get("max_tokens", 256), # 使用 get 获取并提供默认值
request_type="nickname_mapping",
)
logger.info("绰号映射 LLM 映射器初始化成功。")
else:
logger.warning("绰号映射 LLM 配置无效或缺失 'name',功能禁用。")
self.is_enabled = False
except KeyError as ke:
logger.error(f"初始化绰号映射 LLM 时缺少配置项: {ke},功能禁用。", exc_info=True)
self.llm_mapper = None
self.is_enabled = False
except Exception as e:
logger.error(f"初始化绰号映射 LLM 映射器失败: {e},功能禁用。", exc_info=True)
self.llm_mapper = None
self.is_enabled = False
# 队列和线程
self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100)
self.nickname_queue: queue.Queue = queue.Queue(maxsize=self.queue_max_size)
self._stop_event = threading.Event()
self._nickname_thread: Optional[threading.Thread] = None
self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5)
self._initialized = True # 标记为已初始化
logger.info("NicknameManager 初始化完成。")
# 公共方法
def start_processor(self):
"""启动后台处理线程(如果已启用且未运行)。"""
if not self.is_enabled:
logger.info("绰号处理功能已禁用,处理器未启动。")
return
if self._nickname_thread is None or not self._nickname_thread.is_alive():
logger.info("正在启动绰号处理器线程...")
self._stop_event.clear() # 清除停止事件标志
self._nickname_thread = threading.Thread(
target=self._run_processor_in_thread, # 线程执行的入口函数
daemon=True # 设置为守护线程,主程序退出时自动结束
)
self._nickname_thread.start()
logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})")
else:
logger.warning("绰号处理器线程已在运行中。")
def stop_processor(self):
"""停止后台处理线程。"""
if self._nickname_thread and self._nickname_thread.is_alive():
logger.info("正在停止绰号处理器线程...")
self._stop_event.set() # 设置停止事件标志
try:
# 可选:尝试清空队列,避免丢失未处理的任务
# while not self.nickname_queue.empty():
# try:
# self.nickname_queue.get_nowait()
# self.nickname_queue.task_done()
# except queue.Empty:
# break
# logger.info("绰号处理队列已清空。")
self._nickname_thread.join(timeout=10) # 等待线程结束,设置超时
if self._nickname_thread.is_alive():
logger.warning("绰号处理器线程在超时后仍未停止。")
except Exception as e:
logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
finally:
if self._nickname_thread and not self._nickname_thread.is_alive():
logger.info("绰号处理器线程已成功停止。")
self._nickname_thread = None # 清理线程对象引用
else:
logger.info("绰号处理器线程未在运行或已被清理。")
async def trigger_nickname_analysis(
self,
anchor_message: MessageRecv,
bot_reply: List[str],
chat_stream: Optional[ChatStream] = None,
):
"""
准备数据并将其排队等待绰号分析如果满足条件
取代了旧的 trigger_nickname_analysis_if_needed 函数
"""
if not self.is_enabled:
return # 功能禁用则直接返回
current_chat_stream = chat_stream or anchor_message.chat_stream
if not current_chat_stream or not current_chat_stream.group_info:
logger.debug("跳过绰号分析:非群聊或无效的聊天流。")
return
log_prefix = f"[{current_chat_stream.stream_id}]"
try:
# 1. 获取历史记录
history_limit = getattr(self.config, "NICKNAME_ANALYSIS_HISTORY_LIMIT", 30)
history_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=current_chat_stream.stream_id,
timestamp=time.time(),
limit=history_limit,
)
# 格式化历史记录
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True, merge_messages=False, timestamp_mode="relative",
read_mark=0.0, truncate=False,
)
# 2. 获取 Bot 回复
bot_reply_str = " ".join(bot_reply) if bot_reply else ""
# 3. 获取群组和平台信息
group_id = str(current_chat_stream.group_info.group_id)
platform = current_chat_stream.platform
# 4. 构建用户 ID 到名称的映射 (user_name_map)
user_ids_in_history = {str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")}
user_name_map = {}
if user_ids_in_history:
try:
# 使用 relationship_manager 批量获取名称
names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
except Exception as e:
logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True)
names_data = {}
# 填充 user_name_map
for user_id in user_ids_in_history:
if user_id in names_data:
user_name_map[user_id] = names_data[user_id]
else:
# 回退查找历史记录中的 nickname
latest_nickname = next(
(m["user_info"].get("user_nickname")
for m in reversed(history_messages)
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")),
None,
)
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
# 5. 添加到内部处理队列
item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map)
self._add_to_queue(item, platform, group_id) # 调用私有方法入队
except Exception as e:
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
async def get_nickname_prompt_injection(self, chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str:
"""
获取并格式化用于 Prompt 注入的绰号信息字符串
取代了旧的 get_nickname_injection_for_prompt 函数
"""
if not self.is_enabled or not chat_stream or not chat_stream.group_info:
return "" # 功能禁用或非群聊则返回空
log_prefix = f"[{chat_stream.stream_id}]"
try:
group_id = str(chat_stream.group_info.group_id)
platform = chat_stream.platform
# 确定上下文中的用户 ID
user_ids_in_context = {str(msg["user_info"]["user_id"]) for msg in message_list_before_now if msg.get("user_info", {}).get("user_id")}
# 如果消息列表为空,尝试获取最近发言者
if not user_ids_in_context:
recent_speakers = chat_stream.get_recent_speakers(limit=5)
user_ids_in_context.update(str(speaker["user_id"]) for speaker in recent_speakers)
if not user_ids_in_context:
logger.warning(f"{log_prefix} 未找到上下文用户用于绰号注入。")
return ""
# 使用 relationship_manager 批量获取这些用户的群组绰号
all_nicknames_data = await relationship_manager.get_users_group_nicknames(
platform, list(user_ids_in_context), group_id
)
if all_nicknames_data:
# 使用 nickname_utils 中的工具函数进行选择和格式化
selected_nicknames = select_nicknames_for_prompt(all_nicknames_data)
injection_str = format_nickname_prompt_injection(selected_nicknames)
if injection_str:
logger.debug(f"{log_prefix} 生成的绰号 Prompt 注入:\n{injection_str}")
return injection_str
else:
return "" # 没有获取到绰号数据
except Exception as e:
logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True)
return "" # 出错时返回空
# 私有/内部方法
def _add_to_queue(self, item: tuple, platform: str, group_id: str):
"""将项目添加到内部处理队列。"""
try:
self.nickname_queue.put_nowait(item)
logger.debug(f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}")
except queue.Full:
logger.warning(f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。")
except Exception as e:
logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True)
async def _analyze_and_update_nicknames(self, item: tuple):
"""处理单个队列项目:调用 LLM 分析并更新数据库。"""
if not isinstance(item, tuple) or len(item) != 5:
logger.warning(f"从队列接收到无效项目: {type(item)}")
return
chat_history_str, bot_reply, platform, group_id, user_name_map = item
thread_id = threading.get_ident()
log_prefix = f"[线程 {thread_id}][{platform}:{group_id}]"
logger.debug(f"{log_prefix} 开始处理绰号分析任务...")
if not self.llm_mapper:
logger.error(f"{log_prefix} LLM 映射器不可用,无法执行分析。")
return
if not self.db_handler.is_available():
logger.error(f"{log_prefix} 数据库处理器不可用,无法更新计数。")
return
# 1. 调用 LLM 分析 (逻辑从 nickname_mapper 移入)
analysis_result = await self._call_llm_for_analysis(chat_history_str, bot_reply, user_name_map)
# 2. 如果分析成功且找到映射,则更新数据库
if analysis_result.get("is_exist") and analysis_result.get("data"):
nickname_map_to_update = analysis_result["data"]
logger.info(f"{log_prefix} LLM 找到绰号映射,准备更新数据库: {nickname_map_to_update}")
for user_id_str, nickname in nickname_map_to_update.items():
# 基本验证
if not user_id_str or not nickname:
logger.warning(f"{log_prefix} 跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'")
continue
if not user_id_str.isdigit():
logger.warning(f"{log_prefix} 无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。")
continue
user_id_int = int(user_id_str)
# 结束验证
try:
# 步骤 1: 生成 person_id
person_id = person_info_manager.get_person_id(platform, user_id_str)
if not person_id:
logger.error(f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。")
continue
# 步骤 2: 确保 Person 文档存在 (调用 DB Handler)
self.db_handler.upsert_person(person_id, user_id_int, platform)
# 步骤 3: 更新群组绰号 (调用 DB Handler)
self.db_handler.update_group_nickname_count(person_id, group_id, nickname)
except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误
logger.exception(
f"{log_prefix} 数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
except Exception as e:
logger.exception(f"{log_prefix} 处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}")
else:
logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。")
async def _call_llm_for_analysis(
self,
chat_history_str: str,
bot_reply: str,
user_name_map: Dict[str, str],
) -> Dict[str, Any]:
"""
内部方法调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射
(逻辑从 analyze_chat_for_nicknames 移入)
"""
if not self.llm_mapper: # 再次检查 LLM 映射器
logger.error("LLM 映射器未初始化,无法执行分析。")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"构建的绰号映射 Prompt:\n{prompt[:500]}...") # 截断日志输出
try:
# 调用 LLM
response_content, _, _ = await self.llm_mapper.generate_response(prompt)
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
if not response_content:
logger.warning("LLM 返回了空的绰号映射内容。")
return {"is_exist": False}
# 清理可能的 Markdown 代码块标记
response_content = response_content.strip()
markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL | re.IGNORECASE)
match = markdown_code_regex.match(response_content)
if match:
response_content = match.group(1).strip()
# 尝试直接解析 JSON即使没有代码块标记
elif response_content.startswith("{") and response_content.endswith("}"):
pass # 可能是纯 JSON
else:
# 尝试在文本中查找 JSON 对象
json_match = re.search(r'\{.*\}', response_content, re.DOTALL)
if json_match:
response_content = json_match.group(0)
else:
logger.warning(f"LLM 响应似乎不包含有效的 JSON 对象。响应: {response_content}")
return {"is_exist": False}
# 解析 JSON
result = json.loads(response_content)
# 结果验证和过滤
if not isinstance(result, dict):
logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}")
return {"is_exist": False}
is_exist = result.get("is_exist")
if is_exist is True:
original_data = result.get("data")
if isinstance(original_data, dict) and original_data:
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
filtered_data = self._filter_llm_results(original_data, user_name_map) # 调用过滤函数
if not filtered_data:
logger.info("所有找到的绰号映射都被过滤掉了。")
return {"is_exist": False}
else:
logger.info(f"过滤后的绰号映射: {filtered_data}")
return {"is_exist": True, "data": filtered_data}
else:
# is_exist 为 True 但 data 缺失、不是字典或为空
logger.warning(f"LLM 响应格式错误: is_exist=True 但 data 无效。原始 data: {original_data}")
return {"is_exist": False}
elif is_exist is False:
logger.info("LLM 明确指示未找到可靠的绰号映射 (is_exist=False)。")
return {"is_exist": False}
else: # is_exist 不是 True 或 False (包括 None)
logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 无效。")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
return {"is_exist": False}
except Exception as e:
logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True)
return {"is_exist": False}
def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]:
"""过滤 LLM 返回的绰号映射结果。"""
filtered_data = {}
bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, 'BOT_QQ') else None
for user_id, nickname in original_data.items():
# 过滤条件 1: user_id 必须是字符串
if not isinstance(user_id, str):
logger.warning(f"过滤掉非字符串 user_id: {user_id}")
continue
# 过滤条件 2: 排除机器人自身
if bot_qq_str and user_id == bot_qq_str:
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
continue
# 过滤条件 3: 排除 nickname 为空或仅包含空白的情况
if not nickname or nickname.isspace():
logger.debug(f"过滤掉用户 {user_id} 的空绰号。")
continue
# 过滤条件 4 (可选,根据 Prompt 效果决定是否保留): 排除 nickname 与已知名称相同的情况
# person_name = user_name_map.get(user_id)
# if person_name and person_name == nickname:
# logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
# continue
# 如果通过所有过滤条件,则保留
filtered_data[user_id] = nickname.strip() # 保留时去除首尾空白
return filtered_data
# 线程相关
def _run_processor_in_thread(self):
"""后台线程的入口函数,负责创建和运行 asyncio 事件循环。"""
loop = None
thread_id = threading.get_ident()
logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...")
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。")
# 运行主处理循环直到停止事件被设置
loop.run_until_complete(self._processing_loop())
except Exception as e:
logger.error(f"(线程 ID: {thread_id}) 运行绰号处理器线程时出错: {e}", exc_info=True)
finally:
# 确保循环被正确关闭
if loop:
try:
if loop.is_running():
logger.info(f"(线程 ID: {thread_id}) 正在停止 asyncio 循环...")
all_tasks = asyncio.all_tasks(loop)
if all_tasks:
logger.info(f"(线程 ID: {thread_id}) 正在取消 {len(all_tasks)} 个运行中的任务...")
for task in all_tasks:
task.cancel()
# 等待任务取消完成
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
logger.info(f"(线程 ID: {thread_id}) 所有任务已取消。")
loop.stop()
logger.info(f"(线程 ID: {thread_id}) 循环已停止。")
if not loop.is_closed():
loop.close()
logger.info(f"(线程 ID: {thread_id}) Asyncio 循环已关闭。")
except Exception as loop_close_err:
logger.error(f"(线程 ID: {thread_id}) 关闭循环时出错: {loop_close_err}", exc_info=True)
logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).")
async def _processing_loop(self):
"""后台线程中运行的异步处理循环。"""
thread_id = threading.get_ident()
logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。")
while not self._stop_event.is_set():
try:
# 从队列中获取项目,设置超时以允许检查停止事件
item = self.nickname_queue.get(block=True, timeout=self.sleep_interval)
# 处理获取到的项目
await self._analyze_and_update_nicknames(item)
self.nickname_queue.task_done() # 标记任务完成
except queue.Empty:
# 超时,队列为空,继续循环检查停止事件
continue
except asyncio.CancelledError:
logger.info(f"绰号处理循环被取消 (线程 ID: {thread_id})。")
break # 任务被取消,退出循环
except Exception as e:
# 捕获处理单个项目时可能发生的其他异常
logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}")
# 可以在这里添加错误处理逻辑,例如将失败的任务放回队列或记录到错误日志
# 短暂休眠避免快速连续失败
await asyncio.sleep(5)
logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。")
# 在模块级别创建单例实例
# 这使得其他模块可以通过 `from .nickname_manager import nickname_manager` 来导入和使用
nickname_manager = NicknameManager()

View File

@ -1,44 +1,37 @@
import re
import json
from typing import Dict, Any, Optional
# src/plugins/group_nickname/nickname_mapper.py
from typing import Dict
from src.common.logger_manager import get_logger
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
# 这个文件现在只负责构建 PromptLLM 的初始化和调用移至 NicknameManager
logger = get_logger("nickname_mapper")
llm_mapper: Optional[LLMRequest] = None
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
try:
# 从全局配置获取模型设置
model_config = global_config.llm_nickname_mapping
if not model_config or not model_config.get("name"):
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
else:
llm_mapper = LLMRequest( # <-- LLM 初始化
model=global_config.llm_nickname_mapping,
temperature=global_config.llm_nickname_mapping["temp"],
max_tokens=256,
request_type="nickname_mapping",
)
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
except Exception as e:
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
llm_mapper = None
# LLMRequest 实例和 analyze_chat_for_nicknames 函数已被移除
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
"""构建用于 LLM 绰号映射的 Prompt"""
# user_name_map 包含了 user_id 到 person_name (或 fallback nickname) 的映射
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
# print(f"\n\n\nKnown User Info for LLM:\n{user_list_str}\n\n\n\n") # Debugging print
"""
构建用于 LLM 进行绰号映射分析的 Prompt
Args:
chat_history_str: 格式化后的聊天历史记录字符串
bot_reply: Bot 的最新回复字符串
user_name_map: 用户 ID 到已知名称person_name fallback nickname的映射
Returns:
str: 构建好的 Prompt 字符串
"""
# 将 user_name_map 格式化为列表字符串
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items() if uid and name])
if not user_list_str:
user_list_str = "" # 如果映射为空,明确告知
# 核心 Prompt 内容
prompt = f"""
任务分析以下聊天记录和你的最新回复判断其中是否包含用户绰号并确定绰号与用户 ID 之间是否存在明确的一一对应关系
任务仔细分析以下聊天记录和你的最新回复判断其中是否明确提到了某个用户的绰号并且这个绰号可以清晰地与一个特定的用户 ID 对应起来
已知用户信息ID: 名称
{user_list_str}
*注意名称后面带有"(你)"表示是你自己*
聊天记录
---
@ -48,139 +41,36 @@ def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map:
你的最新回复
{bot_reply}
分析要求
1. 识别聊天记录和你发言中出现的可能是用户绰号的词语
2. 判断这些绰号是否能明确地指向某个特定的用户 ID一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来
3. 如果能建立可靠的一一映射关系请输出一个 JSON 对象格式如下
{{
"is_exist": true,
"data": {{
"用户A数字id": "绰号_A",
"用户B数字id": "绰号_B"
分析要求与输出格式
1. 找出聊天记录和你的最新回复中可能是用户绰号的词语
2. 判断这些绰号是否在上下文中**清晰无歧义**地指向了已知用户信息列表中的**某一个特定用户 ID**必须是强关联避免猜测
3. **不要**输出你自己名称后带"(你)"的用户的绰号映射
**不要**输出与用户已知名称完全相同的词语作为绰号
**不要**将在你的最新回复中你对他人使用的称呼或绰号进行映射只分析聊天记录中他人对用户的称呼
**不要**输出指代不明或过于通用的词语大佬兄弟那个谁除非上下文能非常明确地指向特定用户
4. 如果找到了**至少一个**满足上述所有条件的**明确**的用户 ID 到绰号的映射关系请输出 JSON 对象
```json
{{
"is_exist": true,
"data": {{
"用户A数字id": "绰号_A",
"用户B数字id": "绰号_B"
}}
}}
}}
其中 "data" 字段的键是用户的 ID (字符串形式)值是对应的绰号只包含你能确认映射关系的绰号
4. 如果无法建立任何可靠的一一映射关系例如绰号指代不明没有出现绰号或无法确认绰号与用户的关联请输出 JSON 对象
{{
"is_exist": false
}}
5. 已知用户信息列表中你的昵称后面可能包含"(你)"这表示是你自己不需要输出你自身的绰号映射请确保不要将你自己的ID和任何词语映射为绰号
6. 不要输出与用户名称相同的绰号不要输出你发言中对他人的绰号映射
7. 请严格按照 JSON 格式输出不要包含任何额外的解释或文本
```
- `"data"` 字段的键必须是用户的**数字 ID (字符串形式)**值是对应的**绰号 (字符串形式)**
- 只包含你能**百分百确认**映射关系的条目宁缺毋滥
如果**无法找到任何一个**满足条件的明确映射关系请输出 JSON 对象
```json
{{
"is_exist": false
}}
```
5. ****输出 JSON 对象不要包含任何额外的解释注释或代码块标记之外的文本
输出
"""
# logger.debug(f"构建的绰号映射 Prompt (部分):\n{prompt[:500]}...") # 可以在 NicknameManager 中记录
return prompt
async def analyze_chat_for_nicknames(
chat_history_str: str,
bot_reply: str,
user_name_map: Dict[str, str], # 这个 map 包含了 user_id -> person_name 的信息
) -> Dict[str, Any]:
"""
调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射并进行过滤
"""
if not global_config.ENABLE_NICKNAME_MAPPING:
logger.debug("绰号映射功能已禁用。")
return {"is_exist": False}
if llm_mapper is None:
logger.error("绰号映射 LLM 未初始化。无法执行分析。")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"构建的绰号映射 Prompt:\n{prompt}")
try:
# 调用 LLM
response_content, _, _ = await llm_mapper.generate_response(prompt)
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
if not response_content:
logger.warning("LLM 返回了空的绰号映射内容。")
return {"is_exist": False}
# 清理可能的 Markdown 代码块标记
response_content = response_content.strip()
markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL)
match = markdown_code_regex.match(response_content)
if match:
response_content = match.group(1).strip()
# 解析 JSON
result = json.loads(response_content) # 可能抛出 json.JSONDecodeError
# 检查 result 是否为字典
if not isinstance(result, dict):
logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}")
return {"is_exist": False}
# 使用 get 获取 is_exist避免 KeyError
is_exist = result.get("is_exist") # 如果 result 不是字典,下面 get 会在 except AttributeError 中捕获
if is_exist is True:
original_data = result.get("data")
if isinstance(original_data, dict) and original_data: # 确保 data 是非空字典
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
# --- 开始过滤 ---
filtered_data = {}
bot_qq_str = str(global_config.BOT_QQ)
for user_id, nickname in original_data.items():
if not isinstance(user_id, str):
logger.warning(f"LLM 返回的 user_id '{user_id}' 不是字符串,跳过。")
continue
if user_id == bot_qq_str:
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
continue
# 有了改名工具后,该过滤器已不适合了,尝试通过修改 prompt 获得更好的结果
# # 条件 2: 排除 nickname 与 person_name 相同的情况
# person_name = user_name_map.get(user_id) # 从传入的映射中查找 person_name
# if person_name and person_name == nickname:
# logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
# continue
# 如果通过所有过滤条件,则保留
filtered_data[user_id] = nickname
# 检查过滤后是否还有数据
if not filtered_data:
logger.info("所有找到的绰号映射都被过滤掉了。")
return {"is_exist": False}
else:
logger.info(f"过滤后的绰号映射: {filtered_data}")
return {"is_exist": True, "data": filtered_data}
else:
# is_exist 为 True 但 data 缺失、不是字典或为空
if "data" not in result:
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 键缺失。")
elif not isinstance(original_data, dict):
logger.warning(
f"LLM 响应格式错误: is_exist 为 True 但 'data' 不是字典。 原始 data: {original_data}"
)
else: # data 为空字典
logger.debug("LLM 指示 is_exist=True 但 data 为空字典。视为 False 处理。")
return {"is_exist": False}
elif is_exist is False:
logger.info("LLM 未找到可靠的绰号映射。")
return {"is_exist": False}
elif is_exist is None: # 处理 is_exist 键存在但值为 null/None 的情况
logger.warning("LLM 响应格式错误: 'is_exist' 键的值为 None。")
return {"is_exist": False}
else: # 处理 is_exist 存在但值不是 True/False/None 的情况
logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 不是预期的布尔值或 None。")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
return {"is_exist": False}
except Exception as e:
# 捕获其他所有未预料到的异常
logger.error(f"绰号映射 LLM 调用或处理过程中发生未预料的错误: {e}", exc_info=True)
return {"is_exist": False}
# analyze_chat_for_nicknames 函数已被移除,其逻辑移至 NicknameManager._call_llm_for_analysis

View File

@ -1,346 +0,0 @@
import asyncio
import traceback
import threading
import queue
from typing import Dict, Optional
from pymongo.collection import Collection
from pymongo.errors import OperationFailure, DuplicateKeyError # 引入 DuplicateKeyError
from src.common.logger_manager import get_logger
from src.common.database import db # 使用全局 db
from .nickname_mapper import analyze_chat_for_nicknames
from src.config.config import global_config
from ..person_info.person_info import person_info_manager
logger = get_logger("nickname_processor")
_stop_event = threading.Event()
def _upsert_person(collection: Collection, person_id: str, user_id_int: int, platform: str):
"""
确保数据库中存在指定 person_id 的文档 (Upsert)
如果文档不存在则使用提供的用户信息创建它
Args:
collection: MongoDB 集合对象 (person_info)
person_id: 要查找或创建的 person_id
user_id_int: 用户的整数 ID
platform: 平台名称
Returns:
UpdateResult: MongoDB 更新操作的结果
Raises:
DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)
Exception: 其他数据库操作错误
"""
try:
# 关键步骤:基于 person_id 执行 Upsert
# 如果文档不存在,它会被创建,并设置 $setOnInsert 中的字段。
# 如果文档已存在,此操作不会修改任何内容。
result = collection.update_one(
{"person_id": person_id},
{
"$setOnInsert": {
"person_id": person_id,
"user_id": user_id_int, # 确保这里使用传入的 user_id_int
"platform": platform,
"group_nicknames": [], # 初始化 group_nicknames 数组
}
},
upsert=True,
)
if result.upserted_id:
logger.debug(f"Upsert on person_id created new document: {person_id}")
# else:
# logger.debug(f"Upsert on person_id found existing document: {person_id}")
return result
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由 upsert 触发。
# 如果仍然出现,可能指示 person_id 生成逻辑问题或非常罕见的 MongoDB 内部情况。
logger.error(
f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。"
)
raise # 将异常向上抛出,让调用者处理
except Exception as e:
logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}")
raise # 将异常向上抛出
def _update_group_nickname(collection: Collection, person_id: str, group_id_str: str, nickname: str):
"""
尝试更新 person_id 文档中特定群组的绰号计数或添加新条目
按顺序尝试增加计数 -> 添加绰号 -> 添加群组
Args:
collection: MongoDB 集合对象 (person_info)
person_id: 目标文档的 person_id
group_id_str: 目标群组的 ID (字符串)
nickname: 要更新或添加的绰号
"""
# 3a. 尝试增加现有群组中现有绰号的计数
result_inc = collection.update_one(
{
"person_id": person_id,
"group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}},
},
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}},
array_filters=[
{"group.group_id": group_id_str},
{"nick.name": nickname},
],
)
if result_inc.modified_count > 0:
# logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。")
return # 成功增加计数,操作完成
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
result_push_nick = collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": group_id_str, # 检查群组是否存在
},
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
array_filters=[{"group.group_id": group_id_str}],
)
if result_push_nick.modified_count > 0:
logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'")
return # 成功添加绰号,操作完成
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
# 确保 group_nicknames 数组存在 (作为保险措施)
collection.update_one(
{"person_id": person_id, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}},
)
# 推送新的群组对象到 group_nicknames 数组
result_push_group = collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在
},
{
"$push": {
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}],
}
}
},
)
if result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
# else:
# 如果连添加群组也失败 (例如 group_id 已存在但之前的步骤都未匹配,理论上不太可能)
# 可能需要进一步的日志或错误处理,但这通常意味着数据状态异常。
# logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。")
async def update_nickname_counts(platform: str, group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数 (使用全局 db)
通过调用辅助函数来处理 person 文档的 upsert 和绰号更新
Args:
platform (str): 平台名称 (e.g., 'qq')
group_id (str): 群组 ID
nickname_map (Dict[str, str]): 用户 ID (字符串) 到绰号的映射
"""
person_info_collection = db.person_info
if not nickname_map:
logger.debug("提供的用于更新的绰号映射为空。")
return
logger.info(f"尝试更新平台 '{platform}' 群组 '{group_id}' 的绰号计数,映射为: {nickname_map}")
for user_id_str, nickname in nickname_map.items():
# --- 基本验证 ---
if not user_id_str or not nickname:
logger.warning(f"跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'")
continue
group_id_str = str(group_id)
# 使用 isdigit() 检查 user_id_str 是否为纯数字字符串
if not user_id_str.isdigit():
# isdigit() 会对空字符串返回 False并且不识别负号、小数点等
logger.warning(f"无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。")
continue
user_id_int = int(user_id_str)
# --- 结束验证 ---
try:
# --- 步骤 1: 生成 person_id ---
person_id = person_info_manager.get_person_id(platform, user_id_str)
if not person_id:
logger.error(f"无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。")
continue
# --- 步骤 2: 确保 Person 文档存在 (调用辅助函数) ---
_upsert_person(person_info_collection, person_id, user_id_int, platform)
# --- 步骤 3: 更新群组绰号 (调用辅助函数) ---
_update_group_nickname(person_info_collection, person_id, group_id_str, nickname)
# --- 统一处理数据库操作可能抛出的异常 ---
except (OperationFailure, DuplicateKeyError) as db_err: # 捕获特定的数据库错误
logger.exception(
f"数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
except Exception as e:
# 捕获其他所有可能的错误 (例如 person_id 生成、辅助函数内部未捕获的错误等)
logger.exception(f"处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}")
# --- 使用 queue.Queue ---
queue_max_size = getattr(global_config, "NICKNAME_QUEUE_MAX_SIZE", 100)
nickname_queue: queue.Queue = queue.Queue(maxsize=queue_max_size)
_nickname_thread: Optional[threading.Thread] = None
# --- add_to_nickname_queue (保持不变,已包含 platform) ---
async def add_to_nickname_queue(
chat_history_str: str, bot_reply: str, platform: str, group_id: Optional[str], user_name_map: Dict[str, str]
):
"""将需要分析的数据放入队列。"""
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
return
if group_id is None:
logger.debug("私聊跳过绰号映射。")
return
try:
item = (chat_history_str, bot_reply, platform, str(group_id), user_name_map)
nickname_queue.put_nowait(item)
logger.debug(
f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {nickname_queue.qsize()}"
)
except queue.Full:
logger.warning(f"无法将项目添加到绰号队列:队列已满 (maxsize={nickname_queue.maxsize})。")
except Exception as e:
logger.warning(f"无法将项目添加到绰号队列: {e}", exc_info=True)
# --- _nickname_processing_loop (保持不变,已包含 platform) ---
async def _nickname_processing_loop(q: queue.Queue, stop_event: threading.Event):
"""独立线程中的主循环,处理队列任务 (使用全局 db 和 config)。"""
thread_id = threading.get_ident()
logger.info(f"绰号处理循环已启动 (线程 ID: {thread_id})。")
sleep_interval = getattr(global_config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5)
while not stop_event.is_set():
try:
item = q.get(block=True, timeout=sleep_interval)
if isinstance(item, tuple) and len(item) == 5:
chat_history_str, bot_reply, platform, group_id, user_name_map = item
logger.debug(f"(线程 ID: {thread_id}) 正在处理平台 '{platform}' 群组 '{group_id}' 的绰号映射任务...")
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(platform, group_id, analysis_result["data"])
else:
logger.warning(f"(线程 ID: {thread_id}) 从队列接收到意外的项目类型或长度: {type(item)}, 内容: {item}")
q.task_done()
except queue.Empty:
continue
except asyncio.CancelledError:
logger.info(f"绰号处理循环已取消 (线程 ID: {thread_id})。")
break
except Exception as e:
logger.error(f"(线程 ID: {thread_id}) 绰号处理循环出错: {e}\n{traceback.format_exc()}")
await asyncio.sleep(5)
logger.info(f"绰号处理循环已结束 (线程 ID: {thread_id})。")
# --- _run_processor_thread (保持不变) ---
def _run_processor_thread(q: queue.Queue, stop_event: threading.Event):
"""线程启动函数,运行异步循环。"""
loop = None
thread_id = threading.get_ident()
logger.info(f"Nickname processor thread starting (Thread ID: {thread_id})...")
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info(f"(Thread ID: {thread_id}) Asyncio event loop created and set.")
loop.run_until_complete(_nickname_processing_loop(q, stop_event))
except Exception as e:
logger.error(f"(Thread ID: {thread_id}) Error running nickname processor thread: {e}", exc_info=True)
finally:
if loop:
try:
if loop.is_running():
logger.info(f"(Thread ID: {thread_id}) Stopping the asyncio loop...")
all_tasks = asyncio.all_tasks(loop)
if all_tasks:
logger.info(f"(Thread ID: {thread_id}) Cancelling {len(all_tasks)} running tasks...")
for task in all_tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
logger.info(f"(Thread ID: {thread_id}) All tasks cancelled.")
loop.stop()
logger.info(f"(Thread ID: {thread_id}) Loop stopped.")
if not loop.is_closed():
loop.close()
logger.info(f"(Thread ID: {thread_id}) Asyncio loop closed.")
except Exception as loop_close_err:
logger.error(f"(Thread ID: {thread_id}) Error closing loop: {loop_close_err}", exc_info=True)
logger.info(f"Nickname processor thread finished (Thread ID: {thread_id}).")
# --- start_nickname_processor (保持不变) ---
def start_nickname_processor():
"""启动绰号映射处理线程。"""
global _nickname_thread
if not global_config or not global_config.ENABLE_NICKNAME_MAPPING:
logger.info("绰号映射功能已禁用或无法获取配置。处理器未启动。")
return
if _nickname_thread is None or not _nickname_thread.is_alive():
logger.info("正在启动绰号处理器线程...")
stop_event = get_stop_event()
stop_event.clear()
_nickname_thread = threading.Thread(
target=_run_processor_thread, args=(nickname_queue, stop_event), daemon=True
)
_nickname_thread.start()
logger.info(f"绰号处理器线程已启动 (Thread ID: {_nickname_thread.ident})")
else:
logger.warning("绰号处理器线程已在运行中。")
# --- stop_nickname_processor (保持不变) ---
def stop_nickname_processor():
"""停止绰号映射处理线程。"""
global _nickname_thread
if _nickname_thread and _nickname_thread.is_alive():
logger.info("正在停止绰号处理器线程...")
set_stop_event()
try:
_nickname_thread.join(timeout=10)
if _nickname_thread.is_alive():
logger.warning("绰号处理器线程在 10 秒后未结束。")
except Exception as e:
logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
finally:
if _nickname_thread and not _nickname_thread.is_alive():
logger.info("绰号处理器线程已成功停止。")
else:
logger.warning("停止绰号处理器线程:线程可能仍在运行或未正确清理。")
_nickname_thread = None
else:
logger.info("绰号处理器线程未在运行或已被清理。")
# --- Event 控制函数 (保持不变) ---
def get_stop_event() -> threading.Event:
"""获取全局停止事件"""
return _stop_event
def set_stop_event():
"""设置全局停止事件,通知子线程退出"""
_stop_event.set()

View File

@ -1,26 +1,21 @@
import random
import time
from typing import List, Dict, Tuple, Optional
from src.common.logger_manager import get_logger
from src.config.config import global_config
from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import MessageRecv
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from .nickname_processor import add_to_nickname_queue
# 这个文件现在只包含纯粹的工具函数,与状态和流程无关
# 获取日志记录器,命名为 "绰号工具"
logger = get_logger("nickname_utils")
def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]:
"""
从给定的绰号信息中根据映射次数加权随机选择最多 N 个绰号
从给定的绰号信息中根据映射次数加权随机选择最多 N 个绰号用于 Prompt
Args:
all_nicknames_info: 包含用户及其绰号信息的字典格式为
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
注意这里的用户名是 person_name
Returns:
List[Tuple[str, str, int]]: 选中的绰号列表每个元素为 (用户名, 绰号, 次数)
@ -29,17 +24,21 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
if not all_nicknames_info:
return []
candidates = []
candidates = [] # 存储 (用户名, 绰号, 次数, 权重)
smoothing_factor = getattr(global_config, "NICKNAME_PROBABILITY_SMOOTHING", 1.0) # 平滑因子避免权重为0
for user_name, nicknames in all_nicknames_info.items():
if nicknames:
if nicknames and isinstance(nicknames, list):
for nickname_entry in nicknames:
# 确保条目是字典且只有一个键值对
if isinstance(nickname_entry, dict) and len(nickname_entry) == 1:
nickname, count = list(nickname_entry.items())[0]
if isinstance(count, int) and count > 0:
weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING
# 确保次数是正整数
if isinstance(count, int) and count > 0 and isinstance(nickname, str) and nickname:
weight = count + smoothing_factor # 计算权重
candidates.append((user_name, nickname, count, weight))
else:
logger.warning(f"用户 '{user_name}' 的绰号 '{nickname}' 次数无效: {count}。已跳过。")
logger.warning(f"用户 '{user_name}' 的绰号条目无效: {nickname_entry} (次数非正整数或绰号为空)。已跳过。")
else:
logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。")
@ -47,23 +46,24 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
return []
# 确定需要选择的数量
num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates))
max_nicknames = getattr(global_config, "MAX_NICKNAMES_IN_PROMPT", 5)
num_to_select = min(max_nicknames, len(candidates))
try:
# 调用新的辅助函数进行不重复加权抽样
# 调用加权随机抽样(不重复)
selected_candidates_with_weight = weighted_sample_without_replacement(candidates, num_to_select)
# 如果抽样结果数量不足(例如权重问题导致提前退出),可以考虑是否需要补充
if len(selected_candidates_with_weight) < num_to_select:
logger.debug(
f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select})补充选择次数最多的。"
f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select})尝试补充选择次数最多的。"
)
# 筛选出未被选中的候选
selected_ids = set(
(c[0], c[1]) for c in selected_candidates_with_weight
) # 使用 (用户名, 绰号) 作为唯一标识
) # 使用 (用户名, 绰号) 作为唯一标识
remaining_candidates = [c for c in candidates if (c[0], c[1]) not in selected_ids]
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
needed = num_to_select - len(selected_candidates_with_weight)
selected_candidates_with_weight.extend(remaining_candidates[:needed])
@ -71,14 +71,14 @@ def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int
# 日志:记录加权随机选择时发生的错误,并回退到简单选择
logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True)
# 出错时回退到选择次数最多的 N 个
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
# 注意:这里需要选择包含权重的元组,或者调整后续处理
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
selected_candidates_with_weight = candidates[:num_to_select]
# 格式化输出结果为 (用户名, 绰号, 次数),移除权重
result = [(user, nick, count) for user, nick, count, _weight in selected_candidates_with_weight]
result.sort(key=lambda x: x[2], reverse=True) # 按次数降序
# 按次数降序排序最终结果
result.sort(key=lambda x: x[2], reverse=True)
logger.debug(f"为 Prompt 选择的绰号: {result}")
return result
@ -95,13 +95,13 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in
str: 格式化后的字符串如果列表为空则返回空字符串
"""
if not selected_nicknames:
# 如果没有选中的绰号,返回空字符串
return ""
# Prompt 注入部分的标题
prompt_lines = [
"以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),如果有需要提及对方,用你认为合适的方式提及"
] # 注入部分的标题
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
"以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),供你参考"
]
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
# 按用户分组绰号
for user_name, nickname, _count in selected_nicknames:
@ -112,8 +112,9 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in
# 构建每个用户的绰号字符串
for user_name, nicknames in grouped_by_user.items():
nicknames_str = "".join(nicknames) # 使用中文顿号连接
prompt_lines.append(f"- {user_name}ta被群友称为{nicknames_str}") # 格式化输出
nicknames_str = "".join(nicknames) # 使用中文顿号连接
# 格式化输出,例如: "- 张三ta 可能被称为:“三儿”、“张哥”"
prompt_lines.append(f"- {user_name}ta 可能被称为:{nicknames_str}")
# 如果只有标题行,返回空字符串,避免注入无意义的标题
if len(prompt_lines) > 1:
@ -123,213 +124,51 @@ def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, in
return ""
async def get_nickname_injection_for_prompt(chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str:
"""
获取并格式化用于 Prompt 注入的绰号信息字符串
这是一个封装函数整合了获取选择和格式化的逻辑
Args:
chat_stream: 当前的 ChatStream 对象
message_list_before_now: 用于确定上下文中用户的消息列表
Returns:
str: 格式化后的绰号信息字符串如果无法获取或格式化则返回空字符串
"""
nickname_injection_str = ""
# 仅在群聊且功能开启时执行
if global_config.ENABLE_NICKNAME_MAPPING and chat_stream and chat_stream.group_info:
try:
group_id = str(chat_stream.group_info.group_id)
user_ids_in_context = set() # 存储上下文中出现的用户ID
# 从消息列表中提取用户ID
if message_list_before_now:
for msg in message_list_before_now:
sender_id = msg["user_info"].get("user_id")
if sender_id:
user_ids_in_context.add(str(sender_id))
else:
# 如果消息列表为空,尝试获取最近发言者作为上下文用户
recent_speakers = chat_stream.get_recent_speakers(limit=5) # 获取最近5个发言者
for speaker in recent_speakers:
user_ids_in_context.add(str(speaker["user_id"]))
if not user_ids_in_context:
# 日志:记录未找到上下文用户
logger.warning(f"[{chat_stream.stream_id}] 未找到消息或最近发言者用于绰号注入。")
# 如果找到了上下文用户
if user_ids_in_context:
platform = chat_stream.platform
# --- 调用批量获取群组绰号的方法 ---
# 使用 relationship_manager 从数据库获取数据
all_nicknames_data = await relationship_manager.get_users_group_nicknames(
platform, list(user_ids_in_context), group_id
)
# 如果获取到了绰号数据
if all_nicknames_data:
# 调用选择和格式化函数
selected_nicknames = select_nicknames_for_prompt(all_nicknames_data)
nickname_injection_str = format_nickname_prompt_injection(selected_nicknames)
if nickname_injection_str:
# 日志:记录生成的用于 Prompt 的绰号信息
logger.debug(
f"[{chat_stream.stream_id}] 已生成用于 Prompt 的绰号信息:\n{nickname_injection_str}"
)
except Exception as e:
# 日志:记录获取或格式化绰号信息时发生的错误
logger.error(f"[{chat_stream.stream_id}] 获取或格式化 Prompt 绰号信息时出错: {e}", exc_info=True)
nickname_injection_str = "" # 出错时确保返回空字符串
# 返回最终生成的字符串(可能为空)
return nickname_injection_str
async def trigger_nickname_analysis_if_needed(
anchor_message: MessageRecv,
bot_reply: List[str],
chat_stream: Optional[ChatStream] = None, # 允许传入 chat_stream 或从 anchor_message 获取
):
"""
如果满足条件群聊功能开启则准备数据并触发绰号分析任务
将相关信息放入处理队列 nickname_processor 处理
Args:
anchor_message: 触发回复的原始消息对象
bot_reply: Bot 生成的回复内容列表
chat_stream: 可选的 ChatStream 对象
"""
# 检查功能是否开启
if not global_config.ENABLE_NICKNAME_MAPPING:
return # 如果功能禁用,直接返回
# 确定使用的 chat_stream
current_chat_stream = chat_stream or anchor_message.chat_stream
# 检查是否是群聊且 chat_stream 有效
if not current_chat_stream or not current_chat_stream.group_info:
# 日志:记录跳过分析的原因(非群聊或无效流)
logger.debug(
f"[{current_chat_stream.stream_id if current_chat_stream else '未知流'}] 跳过绰号分析:非群聊或无效聊天流。"
)
return
log_prefix = f"[{current_chat_stream.stream_id}]" # 用于日志的前缀
try:
# 1. 获取历史记录
history_limit = 30 # 定义获取历史记录的数量限制
history_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=current_chat_stream.stream_id,
timestamp=time.time(), # 获取当前时间之前的记录
limit=history_limit,
)
# 格式化历史记录为可读字符串
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True, # 替换机器人名称,以便 LLM 分析
merge_messages=False, # 不合并消息,保留原始对话结构
timestamp_mode="relative", # 使用相对时间戳
read_mark=0.0, # 不需要已读标记
truncate=False, # 获取完整内容进行分析
)
# 2. 获取 Bot 回复字符串
bot_reply_str = " ".join(bot_reply) if bot_reply else "" # 处理空回复列表
# 3. 获取群号和平台信息
group_id = str(current_chat_stream.group_info.group_id)
platform = current_chat_stream.platform
# 4. 构建用户 ID 到名称的映射 (user_name_map)
user_ids_in_history = set() # 存储历史记录中出现的用户ID
for msg in history_messages:
sender_id = msg["user_info"].get("user_id")
if sender_id:
user_ids_in_history.add(str(sender_id))
user_name_map = {} # 初始化映射字典
if user_ids_in_history:
try:
# 批量从数据库获取这些用户的 person_name
names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
except Exception as e:
# 日志:记录获取 person_name 时发生的错误
logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True)
names_data = {} # 出错时使用空字典
# 填充 user_name_map
for user_id in user_ids_in_history:
if user_id in names_data:
# 如果数据库中有 person_name则使用它
user_name_map[user_id] = names_data[user_id]
else:
# 如果数据库中没有,则回退查找用户在历史记录中最近使用的 nickname
latest_nickname = next(
(
m["user_info"].get("user_nickname") # 从 user_info 获取 nickname
for m in reversed(history_messages) # 从后往前找
# 确保消息的用户ID匹配且 nickname 存在
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")
),
None, # 如果找不到,返回 None
)
# 如果找到了 nickname 则使用,否则使用 "未知(ID)"
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
# 5. 将准备好的数据添加到绰号处理队列
await add_to_nickname_queue(chat_history_str, bot_reply_str, platform, group_id, user_name_map)
# 日志:记录已成功触发分析任务
logger.debug(f"{log_prefix} 已为群组 {group_id} 触发绰号分析任务。")
except Exception as e:
# 日志:记录触发分析过程中发生的任何其他错误
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
def weighted_sample_without_replacement(
candidates: List[Tuple[str, str, int, float]], k: int
) -> List[Tuple[str, str, int, float]]:
"""
执行不重复的加权随机抽样
执行不重复的加权随机抽样使用 A-ExpJ 算法思想的简化实现
Args:
candidates: 候选列表每个元素为 (用户名, 绰号, 次数, 权重)
k: 需要选择的数量
Returns:
List[Tuple[str, str, int, float]]: 选中的元素列表
List[Tuple[str, str, int, float]]: 选中的元素列表包含权重
"""
if k <= 0:
return []
if k >= len(candidates):
# 如果需要选择的数量大于或等于候选数量,直接返回所有候选
return candidates[:] # 返回副本以避免修改原始列表
n = len(candidates)
if k >= n:
return candidates[:] # 返回副本
pool = candidates[:] # 创建候选列表的副本进行操作
selected = []
# 注意:原评论代码中计算 total_weight 但未使用,这里也省略。
# random.choices 内部会处理权重的归一化。
for _ in range(min(k, len(pool))): # 确保迭代次数不超过池中剩余元素
if not pool: # 如果池已空,提前结束
break
weights = [c[3] for c in pool] # 获取当前池中所有元素的权重
# 检查权重是否有效
if sum(weights) <= 0:
# 如果所有剩余权重无效,随机选择一个(或根据需要采取其他策略)
logger.warning("加权抽样池中剩余权重总和为0或负数随机选择一个。")
chosen_index = random.randrange(len(pool))
chosen = pool.pop(chosen_index)
# 计算每个元素的 key = U^(1/weight),其中 U 是 (0, 1) 之间的随机数
# 为了数值稳定性,计算 log(key) = log(U) / weight
# log(U) 可以用 -Exponential(1) 来生成
weighted_keys = []
for i in range(n):
weight = candidates[i][3]
if weight <= 0:
# 处理权重为0或负数的情况赋予一个极小的概率或极大负数的log_key
log_key = float('-inf') # 或者一个非常大的负数
logger.warning(f"候选者 {candidates[i][:2]} 的权重为非正数 ({weight}),抽中概率极低。")
else:
# 使用 random.choices 进行加权抽样,选择 1 个
# random.choices 返回一个列表,所以取第一个元素 [0]
chosen = random.choices(pool, weights=weights, k=1)[0]
pool.remove(chosen) # 从池中移除选中的元素,实现不重复抽样
log_u = -random.expovariate(1.0) # 生成 -Exponential(1) 随机数
log_key = log_u / weight
weighted_keys.append((log_key, i)) # 存储 (log_key, 原始索引)
selected.append(chosen)
# 按 log_key 降序排序 (相当于按 key 升序排序)
weighted_keys.sort(key=lambda x: x[0], reverse=True)
return selected
# 选择 log_key 最大的 k 个元素的原始索引
selected_indices = [index for _log_key, index in weighted_keys[:k]]
# 根据选中的索引从原始 candidates 列表中获取元素
selected_items = [candidates[i] for i in selected_indices]
return selected_items
# 移除旧的流程函数
# get_nickname_injection_for_prompt 和 trigger_nickname_analysis_if_needed
# 的逻辑现在由 NicknameManager 处理

View File

@ -28,9 +28,8 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.moods.moods import MoodManager
from src.heart_flow.utils_chat import get_chat_type_and_target_info
from rich.traceback import install
from src.plugins.group_nickname.nickname_utils import trigger_nickname_analysis_if_needed
from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.plugins.group_nickname.nickname_utils import get_nickname_injection_for_prompt
from src.plugins.group_nickname.nickname_manager import nickname_manager
install(extra_lines=3)
@ -605,7 +604,7 @@ class HeartFChatting:
)
# 调用工具函数触发绰号分析
await trigger_nickname_analysis_if_needed(anchor_message, reply, self.chat_stream)
await nickname_manager.trigger_nickname_analysis(anchor_message, reply, self.chat_stream)
return True, thinking_id
@ -874,7 +873,7 @@ class HeartFChatting:
limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit
)
# 调用工具函数获取格式化后的绰号字符串
nickname_injection_str = await get_nickname_injection_for_prompt(self.chat_stream, message_list_before_now)
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(self.chat_stream, message_list_before_now)
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
prompt = await prompt_builder.build_planner_prompt(

View File

@ -14,7 +14,7 @@ from ..moods.moods import MoodManager
from ..memory_system.Hippocampus import HippocampusManager
from ..schedule.schedule_generator import bot_schedule
from ..knowledge.knowledge_lib import qa_manager
from src.plugins.group_nickname.nickname_utils import get_nickname_injection_for_prompt
from src.plugins.group_nickname.nickname_manager import nickname_manager
import traceback
from .heartFC_Cycleinfo import CycleInfo
@ -255,7 +255,7 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s
chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2")
# 调用新的工具函数获取绰号信息
nickname_injection_str = await get_nickname_injection_for_prompt(chat_stream, message_list_before_now)
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(chat_stream, message_list_before_now)
prompt = await global_prompt_manager.format_prompt(
template_name,
@ -451,7 +451,7 @@ class PromptBuilder:
chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2")
# 调用新的工具函数获取绰号信息
nickname_injection_str = await get_nickname_injection_for_prompt(chat_stream, message_list_before_now)
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(chat_stream, message_list_before_now)
prompt = await global_prompt_manager.format_prompt(
template_name,

View File

@ -20,7 +20,7 @@ from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.utils.timer_calculator import Timer
from src.heart_flow.utils_chat import get_chat_type_and_target_info
from src.plugins.group_nickname.nickname_utils import trigger_nickname_analysis_if_needed
from src.plugins.group_nickname.nickname_manager import nickname_manager
logger = get_logger("chat")
@ -317,7 +317,7 @@ class NormalChat:
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg:
info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg)
await trigger_nickname_analysis_if_needed(message, response_set, self.chat_stream)
await nickname_manager.trigger_nickname_analysis(message, response_set, self.chat_stream)
else:
logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher")

View File

@ -128,7 +128,7 @@ enable_nickname_mapping = false # 绰号映射功能总开关(默认关闭,
max_nicknames_in_prompt = 10 # Prompt 中最多注入的绰号数量防止token数量爆炸
nickname_probability_smoothing = 1 # 绰号加权随机选择的平滑因子
nickname_queue_max_size = 100 # 绰号处理队列最大容量
nickname_process_sleep_interval = 5 # 绰号处理进程休眠间隔(秒)
nickname_process_sleep_interval = 60 # 绰号处理进程休眠间隔(秒)
[memory]
build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多

View File

@ -54,7 +54,7 @@ res_top_k = 3 # 最终提供的文段TopK
[persistence]
# 持久化配置(存储中间数据,防止重复计算)
data_root_path = "data" # 数据根目录
raw_data_path = "data/imported_lpmm_data" # 原始数据路径
openie_data_path = "data/openie" # OpenIE数据路径
raw_data_path = "data/import.json" # 原始数据路径
openie_data_path = "data/openie.json" # OpenIE数据路径
embedding_data_dir = "data/embedding" # 嵌入数据目录
rag_data_dir = "data/rag" # RAG数据目录