Merge branch 'groupnickname' of https://github.com/Dax233/MaiMBot into G-Test

pull/937/head
Bakadax 2025-05-03 22:55:18 +08:00
commit 429eaaae4f
12 changed files with 1185 additions and 14 deletions

15
bot.py
View File

@ -14,6 +14,8 @@ from src.common.logger_manager import get_logger
from src.common.crash_logger import install_crash_handler
from src.main import MainSystem
from rich.traceback import install
from src.plugins.group_nickname.nickname_manager import nickname_manager
import atexit
install(extra_lines=3)
@ -221,6 +223,19 @@ def raw_main():
env_config = {key: os.getenv(key) for key in os.environ}
scan_provider(env_config)
# 确保 NicknameManager 单例实例存在并已初始化
# (单例模式下,导入时或第一次调用时会自动初始化)
_ = nickname_manager # 显式引用一次
# 启动 NicknameManager 的后台处理器线程
logger.info("准备启动绰号处理管理器...")
nickname_manager.start_processor() # 调用实例的方法
logger.info("已调用启动绰号处理管理器。")
# 注册 NicknameManager 的停止方法到 atexit确保程序退出时线程能被清理
atexit.register(nickname_manager.stop_processor) # 注册实例的方法
logger.info("已注册绰号处理管理器的退出处理程序。")
# 返回MainSystem实例
return MainSystem()

View File

@ -1,7 +1,7 @@
import os
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any
from dateutil import tz
import tomli
@ -274,6 +274,13 @@ class BotConfig:
talk_allowed_private = set()
enable_pfc_chatting: bool = False # 是否启用PFC聊天
# Group Nickname
ENABLE_NICKNAME_MAPPING: bool = False # 绰号映射功能总开关
MAX_NICKNAMES_IN_PROMPT: int = 10 # Prompt 中最多注入的绰号数量
NICKNAME_PROBABILITY_SMOOTHING: int = 1 # 绰号加权随机选择的平滑因子
NICKNAME_QUEUE_MAX_SIZE: int = 100 # 绰号处理队列最大容量
NICKNAME_PROCESS_SLEEP_INTERVAL: float = 60 # 绰号处理进程休眠间隔(秒)
# 模型配置
llm_reasoning: dict[str, str] = field(default_factory=lambda: {})
# llm_reasoning_minor: dict[str, str] = field(default_factory=lambda: {})
@ -289,6 +296,7 @@ class BotConfig:
llm_heartflow: Dict[str, str] = field(default_factory=lambda: {})
llm_tool_use: Dict[str, str] = field(default_factory=lambda: {})
llm_plan: Dict[str, str] = field(default_factory=lambda: {})
llm_nickname_mapping: Dict[str, Any] = field(default_factory=dict)
api_urls: Dict[str, str] = field(default_factory=lambda: {})
@ -402,6 +410,25 @@ class BotConfig:
config.save_emoji = emoji_config.get("save_emoji", config.save_emoji)
config.steal_emoji = emoji_config.get("steal_emoji", config.steal_emoji)
def group_nickname(parent: dict):
if config.INNER_VERSION in SpecifierSet(">=1.6.2"):
gn_config = parent.get("group_nickname", {})
config.ENABLE_NICKNAME_MAPPING = gn_config.get(
"enable_nickname_mapping", config.ENABLE_NICKNAME_MAPPING
)
config.MAX_NICKNAMES_IN_PROMPT = gn_config.get(
"max_nicknames_in_prompt", config.MAX_NICKNAMES_IN_PROMPT
)
config.NICKNAME_PROBABILITY_SMOOTHING = gn_config.get(
"nickname_probability_smoothing", config.NICKNAME_PROBABILITY_SMOOTHING
)
config.NICKNAME_QUEUE_MAX_SIZE = gn_config.get(
"nickname_queue_max_size", config.NICKNAME_QUEUE_MAX_SIZE
)
config.NICKNAME_PROCESS_SLEEP_INTERVAL = gn_config.get(
"nickname_process_sleep_interval", config.NICKNAME_PROCESS_SLEEP_INTERVAL
)
def bot(parent: dict):
# 机器人基础配置
bot_config = parent["bot"]
@ -487,6 +514,7 @@ class BotConfig:
"llm_PFC_action_planner",
"llm_PFC_chat",
"llm_PFC_reply_checker",
"llm_nickname_mapping",
]
for item in config_list:
@ -692,6 +720,7 @@ class BotConfig:
"chat": {"func": chat, "support": ">=1.6.0", "necessary": False},
"normal_chat": {"func": normal_chat, "support": ">=1.6.0", "necessary": False},
"focus_chat": {"func": focus_chat, "support": ">=1.6.0", "necessary": False},
"group_nickname": {"func": group_nickname, "support": ">=0.6.3", "necessary": False},
}
# 原地修改,将 字符串版本表达式 转换成 版本对象

View File

@ -0,0 +1,160 @@
from pymongo.collection import Collection
from pymongo.errors import OperationFailure, DuplicateKeyError
from src.common.logger_manager import get_logger
from typing import Optional
logger = get_logger("nickname_db")
class NicknameDB:
"""
处理与群组绰号相关的数据库操作 (MongoDB)
封装了对 'person_info' 集合的读写操作
"""
def __init__(self, person_info_collection: Optional[Collection]):
"""
初始化 NicknameDB 处理器
Args:
person_info_collection: MongoDB 'person_info' 集合对象
如果为 None则数据库操作将被禁用
"""
if person_info_collection is None:
logger.error("未提供 person_info 集合NicknameDB 操作将被禁用。")
self.person_info_collection = None
else:
self.person_info_collection = person_info_collection
logger.info("NicknameDB 初始化成功。")
def is_available(self) -> bool:
"""检查数据库集合是否可用。"""
return self.person_info_collection is not None
def upsert_person(self, person_id: str, user_id_int: int, platform: str):
"""
确保数据库中存在指定 person_id 的文档 (Upsert)
如果文档不存在则使用提供的用户信息创建它
Args:
person_id: 要查找或创建的 person_id
user_id_int: 用户的整数 ID
platform: 平台名称
Returns:
UpdateResult None: MongoDB 更新操作的结果如果数据库不可用则返回 None
Raises:
DuplicateKeyError: 如果发生重复键错误 (理论上不应由 upsert 触发)
Exception: 其他数据库操作错误
"""
if not self.is_available():
logger.error("数据库集合不可用,无法执行 upsert_person。")
return None
try:
# 关键步骤:基于 person_id 执行 Upsert
result = self.person_info_collection.update_one(
{"person_id": person_id},
{
"$setOnInsert": {
"person_id": person_id,
"user_id": user_id_int,
"platform": platform,
"group_nicknames": [], # 初始化 group_nicknames 数组
}
},
upsert=True,
)
if result.upserted_id:
logger.debug(f"Upsert 创建了新的 person 文档: {person_id}")
return result
except DuplicateKeyError as dk_err:
# 这个错误理论上不应该再由 upsert 触发。
logger.error(
f"数据库操作失败 (DuplicateKeyError): person_id {person_id}. 错误: {dk_err}. 这不应该发生,请检查 person_id 生成逻辑和数据库状态。"
)
raise # 将异常向上抛出
except Exception as e:
logger.exception(f"对 person_id {person_id} 执行 Upsert 时失败: {e}")
raise # 将异常向上抛出
def update_group_nickname_count(self, person_id: str, group_id_str: str, nickname: str):
"""
尝试更新 person_id 文档中特定群组的绰号计数或添加新条目
按顺序尝试增加计数 -> 添加绰号 -> 添加群组
Args:
person_id: 目标文档的 person_id
group_id_str: 目标群组的 ID (字符串)
nickname: 要更新或添加的绰号
"""
if not self.is_available():
logger.error("数据库集合不可用,无法执行 update_group_nickname_count。")
return
try:
# 3a. 尝试增加现有群组中现有绰号的计数
result_inc = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames": {"$elemMatch": {"group_id": group_id_str, "nicknames.name": nickname}},
},
{"$inc": {"group_nicknames.$[group].nicknames.$[nick].count": 1}},
array_filters=[
{"group.group_id": group_id_str},
{"nick.name": nickname},
],
)
if result_inc.modified_count > 0:
# logger.debug(f"成功增加 person_id {person_id} 在群组 {group_id_str} 中绰号 '{nickname}' 的计数。")
return # 成功增加计数,操作完成
# 3b. 如果上一步未修改 (绰号不存在于该群组),尝试将新绰号添加到现有群组
result_push_nick = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": group_id_str, # 检查群组是否存在
},
{"$push": {"group_nicknames.$[group].nicknames": {"name": nickname, "count": 1}}},
array_filters=[{"group.group_id": group_id_str}],
)
if result_push_nick.modified_count > 0:
logger.debug(f"成功为 person_id {person_id} 在现有群组 {group_id_str} 中添加新绰号 '{nickname}'")
return # 成功添加绰号,操作完成
# 3c. 如果上一步也未修改 (群组条目本身不存在),则添加新的群组条目和绰号
# 确保 group_nicknames 数组存在 (作为保险措施)
self.person_info_collection.update_one(
{"person_id": person_id, "group_nicknames": {"$exists": False}},
{"$set": {"group_nicknames": []}},
)
# 推送新的群组对象到 group_nicknames 数组
result_push_group = self.person_info_collection.update_one(
{
"person_id": person_id,
"group_nicknames.group_id": {"$ne": group_id_str}, # 确保该群组 ID 尚未存在
},
{
"$push": {
"group_nicknames": {
"group_id": group_id_str,
"nicknames": [{"name": nickname, "count": 1}],
}
}
},
)
if result_push_group.modified_count > 0:
logger.debug(f"为 person_id {person_id} 添加了新的群组 {group_id_str} 和绰号 '{nickname}'")
# else:
# logger.warning(f"尝试为 person_id {person_id} 添加新群组 {group_id_str} 失败,可能群组已存在但结构不符合预期。")
except (OperationFailure, DuplicateKeyError) as db_err:
logger.exception(
f"数据库操作失败 ({type(db_err).__name__}): person_id {person_id}, 群组 {group_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
# 根据需要决定是否向上抛出 raise db_err
except Exception as e:
logger.exception(
f"更新群组绰号计数时发生意外错误: person_id {person_id}, group {group_id_str}, nick {nickname}. Error: {e}"
)
# 根据需要决定是否向上抛出 raise e

View File

@ -0,0 +1,528 @@
import asyncio
import threading
import time
import json
import re
from typing import Dict, Optional, List, Any
from pymongo.errors import OperationFailure, DuplicateKeyError
from src.common.logger_manager import get_logger
from src.common.database import db
from src.config.config import global_config
from src.plugins.models.utils_model import LLMRequest
from .nickname_db import NicknameDB
from .nickname_mapper import _build_mapping_prompt
from .nickname_utils import select_nicknames_for_prompt, format_nickname_prompt_injection
from ..person_info.person_info import person_info_manager
from ..person_info.relationship_manager import relationship_manager
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import MessageRecv
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
logger = get_logger("NicknameManager")
logger_helper = get_logger("AsyncLoopHelper") # 为辅助函数创建单独的 logger
def run_async_loop(loop: asyncio.AbstractEventLoop, coro):
"""
运行给定的协程直到完成并确保循环最终关闭
Args:
loop: 要使用的 asyncio 事件循环
coro: 要在循环中运行的主协程
"""
try:
logger_helper.debug(f"Running coroutine in loop {id(loop)}...")
result = loop.run_until_complete(coro)
logger_helper.debug(f"Coroutine completed in loop {id(loop)}.")
return result
except asyncio.CancelledError:
logger_helper.info(f"Coroutine in loop {id(loop)} was cancelled.")
# 取消是预期行为,不视为错误
except Exception as e:
logger_helper.error(f"Error in async loop {id(loop)}: {e}", exc_info=True)
finally:
try:
# 1. 取消所有剩余任务
all_tasks = asyncio.all_tasks(loop)
current_task = asyncio.current_task(loop)
tasks_to_cancel = [
task for task in all_tasks if task is not current_task
] # 避免取消 run_until_complete 本身
if tasks_to_cancel:
logger_helper.info(f"Cancelling {len(tasks_to_cancel)} outstanding tasks in loop {id(loop)}...")
for task in tasks_to_cancel:
task.cancel()
# 等待取消完成
loop.run_until_complete(asyncio.gather(*tasks_to_cancel, return_exceptions=True))
logger_helper.info(f"Outstanding tasks cancelled in loop {id(loop)}.")
# 2. 停止循环 (如果仍在运行)
if loop.is_running():
loop.stop()
logger_helper.info(f"Asyncio loop {id(loop)} stopped.")
# 3. 关闭循环 (如果未关闭)
if not loop.is_closed():
# 在关闭前再运行一次以处理挂起的关闭回调
loop.run_until_complete(loop.shutdown_asyncgens()) # 关闭异步生成器
loop.close()
logger_helper.info(f"Asyncio loop {id(loop)} closed.")
except Exception as close_err:
logger_helper.error(f"Error during asyncio loop cleanup for loop {id(loop)}: {close_err}", exc_info=True)
class NicknameManager:
"""
管理群组绰号分析处理存储和使用的单例类
封装了 LLM 调用后台处理线程和数据库交互
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
if not cls._instance:
logger.info("正在创建 NicknameManager 单例实例...")
cls._instance = super(NicknameManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""
初始化 NicknameManager
使用锁和标志确保实际初始化只执行一次
"""
if hasattr(self, "_initialized") and self._initialized:
return
with self._lock:
if hasattr(self, "_initialized") and self._initialized:
return
logger.info("正在初始化 NicknameManager 组件...")
self.config = global_config
self.is_enabled = self.config.ENABLE_NICKNAME_MAPPING
# 数据库处理器
person_info_collection = getattr(db, "person_info", None)
self.db_handler = NicknameDB(person_info_collection)
if not self.db_handler.is_available():
logger.error("数据库处理器初始化失败NicknameManager 功能受限。")
self.is_enabled = False
# LLM 映射器
self.llm_mapper: Optional[LLMRequest] = None
if self.is_enabled:
try:
model_config = self.config.llm_nickname_mapping
if model_config and model_config.get("name"):
self.llm_mapper = LLMRequest(
model=model_config,
temperature=model_config.get("temp", 0.5),
max_tokens=model_config.get("max_tokens", 256),
request_type="nickname_mapping",
)
logger.info("绰号映射 LLM 映射器初始化成功。")
else:
logger.warning("绰号映射 LLM 配置无效或缺失 'name',功能禁用。")
self.is_enabled = False
except KeyError as ke:
logger.error(f"初始化绰号映射 LLM 时缺少配置项: {ke},功能禁用。", exc_info=True)
self.llm_mapper = None
self.is_enabled = False
except Exception as e:
logger.error(f"初始化绰号映射 LLM 映射器失败: {e},功能禁用。", exc_info=True)
self.llm_mapper = None
self.is_enabled = False
# 队列和线程
self.queue_max_size = getattr(self.config, "NICKNAME_QUEUE_MAX_SIZE", 100)
# 使用 asyncio.Queue
self.nickname_queue: asyncio.Queue = asyncio.Queue(maxsize=self.queue_max_size)
self._stop_event = threading.Event() # stop_event 仍然使用 threading.Event因为它是由另一个线程设置的
self._nickname_thread: Optional[threading.Thread] = None
self.sleep_interval = getattr(self.config, "NICKNAME_PROCESS_SLEEP_INTERVAL", 0.5) # 超时时间
self._initialized = True
logger.info("NicknameManager 初始化完成。")
def start_processor(self):
"""启动后台处理线程(如果已启用且未运行)。"""
if not self.is_enabled:
logger.info("绰号处理功能已禁用,处理器未启动。")
return
if self._nickname_thread is None or not self._nickname_thread.is_alive():
logger.info("正在启动绰号处理器线程...")
self._stop_event.clear()
self._nickname_thread = threading.Thread(
target=self._run_processor_in_thread, # 线程目标函数不变
daemon=True,
)
self._nickname_thread.start()
logger.info(f"绰号处理器线程已启动 (ID: {self._nickname_thread.ident})")
else:
logger.warning("绰号处理器线程已在运行中。")
def stop_processor(self):
"""停止后台处理线程。"""
if self._nickname_thread and self._nickname_thread.is_alive():
logger.info("正在停止绰号处理器线程...")
self._stop_event.set() # 设置停止事件_processing_loop 会检测到
try:
# 不需要清空 asyncio.Queue让循环自然结束或被取消
self._nickname_thread.join(timeout=10) # 等待线程结束
if self._nickname_thread.is_alive():
logger.warning("绰号处理器线程在超时后仍未停止。")
except Exception as e:
logger.error(f"停止绰号处理器线程时出错: {e}", exc_info=True)
finally:
if self._nickname_thread and not self._nickname_thread.is_alive():
logger.info("绰号处理器线程已成功停止。")
self._nickname_thread = None
else:
logger.info("绰号处理器线程未在运行或已被清理。")
async def trigger_nickname_analysis(
self,
anchor_message: MessageRecv,
bot_reply: List[str],
chat_stream: Optional[ChatStream] = None,
):
"""
准备数据并将其排队等待绰号分析如果满足条件
(现在调用异步的 _add_to_queue)
"""
if not self.is_enabled:
return
current_chat_stream = chat_stream or anchor_message.chat_stream
if not current_chat_stream or not current_chat_stream.group_info:
logger.debug("跳过绰号分析:非群聊或无效的聊天流。")
return
log_prefix = f"[{current_chat_stream.stream_id}]"
try:
# 1. 获取历史记录
history_limit = getattr(self.config, "NICKNAME_ANALYSIS_HISTORY_LIMIT", 30)
history_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=current_chat_stream.stream_id,
timestamp=time.time(),
limit=history_limit,
)
# 格式化历史记录
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
truncate=False,
)
# 2. 获取 Bot 回复
bot_reply_str = " ".join(bot_reply) if bot_reply else ""
# 3. 获取群组和平台信息
group_id = str(current_chat_stream.group_info.group_id)
platform = current_chat_stream.platform
# 4. 构建用户 ID 到名称的映射 (user_name_map)
user_ids_in_history = {
str(msg["user_info"]["user_id"]) for msg in history_messages if msg.get("user_info", {}).get("user_id")
}
user_name_map = {}
if user_ids_in_history:
try:
names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
except Exception as e:
logger.error(f"{log_prefix} 批量获取 person_name 时出错: {e}", exc_info=True)
names_data = {}
for user_id in user_ids_in_history:
if user_id in names_data:
user_name_map[user_id] = names_data[user_id]
else:
latest_nickname = next(
(
m["user_info"].get("user_nickname")
for m in reversed(history_messages)
if str(m["user_info"].get("user_id")) == user_id and m["user_info"].get("user_nickname")
),
None,
)
user_name_map[user_id] = latest_nickname or f"未知({user_id})"
item = (chat_history_str, bot_reply_str, platform, group_id, user_name_map)
await self._add_to_queue(item, platform, group_id)
except Exception as e:
logger.error(f"{log_prefix} 触发绰号分析时出错: {e}", exc_info=True)
async def get_nickname_prompt_injection(self, chat_stream: ChatStream, message_list_before_now: List[Dict]) -> str:
"""
获取并格式化用于 Prompt 注入的绰号信息字符串
"""
if not self.is_enabled or not chat_stream or not chat_stream.group_info:
return ""
log_prefix = f"[{chat_stream.stream_id}]"
try:
group_id = str(chat_stream.group_info.group_id)
platform = chat_stream.platform
user_ids_in_context = {
str(msg["user_info"]["user_id"])
for msg in message_list_before_now
if msg.get("user_info", {}).get("user_id")
}
if not user_ids_in_context:
recent_speakers = chat_stream.get_recent_speakers(limit=5)
user_ids_in_context.update(str(speaker["user_id"]) for speaker in recent_speakers)
if not user_ids_in_context:
logger.warning(f"{log_prefix} 未找到上下文用户用于绰号注入。")
return ""
all_nicknames_data = await relationship_manager.get_users_group_nicknames(
platform, list(user_ids_in_context), group_id
)
if all_nicknames_data:
selected_nicknames = select_nicknames_for_prompt(all_nicknames_data)
injection_str = format_nickname_prompt_injection(selected_nicknames)
if injection_str:
logger.debug(f"{log_prefix} 生成的绰号 Prompt 注入:\n{injection_str}")
return injection_str
else:
return ""
except Exception as e:
logger.error(f"{log_prefix} 获取绰号注入时出错: {e}", exc_info=True)
return ""
# 私有/内部方法
async def _add_to_queue(self, item: tuple, platform: str, group_id: str):
"""将项目异步添加到内部处理队列 (asyncio.Queue)。"""
try:
# 使用 await put(),如果队列满则异步等待
await self.nickname_queue.put(item)
logger.debug(
f"已将项目添加到平台 '{platform}' 群组 '{group_id}' 的绰号队列。当前大小: {self.nickname_queue.qsize()}"
)
except asyncio.QueueFull:
# 理论上 await put() 不会直接抛 QueueFull除非 maxsize=0
# 但保留以防万一或未来修改
logger.warning(
f"绰号队列已满 (最大={self.queue_max_size})。平台 '{platform}' 群组 '{group_id}' 的项目被丢弃。"
)
except Exception as e:
logger.error(f"将项目添加到绰号队列时出错: {e}", exc_info=True)
async def _analyze_and_update_nicknames(self, item: tuple):
"""处理单个队列项目:调用 LLM 分析并更新数据库。"""
if not isinstance(item, tuple) or len(item) != 5:
logger.warning(f"从队列接收到无效项目: {type(item)}")
return
chat_history_str, bot_reply, platform, group_id, user_name_map = item
# 使用 asyncio.get_running_loop().call_soon(threading.get_ident) 可能不准确线程ID是同步概念
# 可以考虑移除线程ID日志或寻找异步安全的获取标识符的方式
log_prefix = f"[{platform}:{group_id}]" # 简化日志前缀
logger.debug(f"{log_prefix} 开始处理绰号分析任务...")
if not self.llm_mapper:
logger.error(f"{log_prefix} LLM 映射器不可用,无法执行分析。")
return
if not self.db_handler.is_available():
logger.error(f"{log_prefix} 数据库处理器不可用,无法更新计数。")
return
# 1. 调用 LLM 分析 (内部逻辑不变)
analysis_result = await self._call_llm_for_analysis(chat_history_str, bot_reply, user_name_map)
# 2. 如果分析成功且找到映射,则更新数据库 (内部逻辑不变)
if analysis_result.get("is_exist") and analysis_result.get("data"):
nickname_map_to_update = analysis_result["data"]
logger.info(f"{log_prefix} LLM 找到绰号映射,准备更新数据库: {nickname_map_to_update}")
for user_id_str, nickname in nickname_map_to_update.items():
# ... (验证和数据库更新逻辑保持不变) ...
if not user_id_str or not nickname:
logger.warning(f"{log_prefix} 跳过无效条目: user_id='{user_id_str}', nickname='{nickname}'")
continue
if not user_id_str.isdigit():
logger.warning(f"{log_prefix} 无效的用户ID格式 (非纯数字): '{user_id_str}',跳过。")
continue
user_id_int = int(user_id_str)
try:
person_id = person_info_manager.get_person_id(platform, user_id_str)
if not person_id:
logger.error(
f"{log_prefix} 无法为 platform='{platform}', user_id='{user_id_str}' 生成 person_id跳过此用户。"
)
continue
self.db_handler.upsert_person(person_id, user_id_int, platform)
self.db_handler.update_group_nickname_count(person_id, group_id, nickname)
except (OperationFailure, DuplicateKeyError) as db_err:
logger.exception(
f"{log_prefix} 数据库操作失败 ({type(db_err).__name__}): 用户 {user_id_str}, 绰号 {nickname}. 错误: {db_err}"
)
except Exception as e:
logger.exception(f"{log_prefix} 处理用户 {user_id_str} 的绰号 '{nickname}' 时发生意外错误:{e}")
else:
logger.debug(f"{log_prefix} LLM 未找到可靠的绰号映射或分析失败。")
async def _call_llm_for_analysis(
self,
chat_history_str: str,
bot_reply: str,
user_name_map: Dict[str, str],
) -> Dict[str, Any]:
"""
内部方法调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射
"""
# ... (此方法内部逻辑保持不变) ...
if not self.llm_mapper:
logger.error("LLM 映射器未初始化,无法执行分析。")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"构建的绰号映射 Prompt:\n{prompt[:500]}...")
try:
response_content, _, _ = await self.llm_mapper.generate_response(prompt)
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
if not response_content:
logger.warning("LLM 返回了空的绰号映射内容。")
return {"is_exist": False}
response_content = response_content.strip()
markdown_code_regex = re.compile(r"^```(?:\w+)?\s*\n(.*?)\n\s*```$", re.DOTALL | re.IGNORECASE)
match = markdown_code_regex.match(response_content)
if match:
response_content = match.group(1).strip()
elif response_content.startswith("{") and response_content.endswith("}"):
pass # 可能是纯 JSON
else:
json_match = re.search(r"\{.*\}", response_content, re.DOTALL)
if json_match:
response_content = json_match.group(0)
else:
logger.warning(f"LLM 响应似乎不包含有效的 JSON 对象。响应: {response_content}")
return {"is_exist": False}
result = json.loads(response_content)
if not isinstance(result, dict):
logger.warning(f"LLM 响应不是一个有效的 JSON 对象 (字典类型)。响应内容: {response_content}")
return {"is_exist": False}
is_exist = result.get("is_exist")
if is_exist is True:
original_data = result.get("data")
if isinstance(original_data, dict) and original_data:
logger.info(f"LLM 找到的原始绰号映射: {original_data}")
filtered_data = self._filter_llm_results(original_data, user_name_map)
if not filtered_data:
logger.info("所有找到的绰号映射都被过滤掉了。")
return {"is_exist": False}
else:
logger.info(f"过滤后的绰号映射: {filtered_data}")
return {"is_exist": True, "data": filtered_data}
else:
logger.warning(f"LLM 响应格式错误: is_exist=True 但 data 无效。原始 data: {original_data}")
return {"is_exist": False}
elif is_exist is False:
logger.info("LLM 明确指示未找到可靠的绰号映射 (is_exist=False)。")
return {"is_exist": False}
else:
logger.warning(f"LLM 响应格式错误: 'is_exist' 的值 '{is_exist}' 无效。")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
return {"is_exist": False}
except Exception as e:
logger.error(f"绰号映射 LLM 调用或处理过程中发生意外错误: {e}", exc_info=True)
return {"is_exist": False}
def _filter_llm_results(self, original_data: Dict[str, str], user_name_map: Dict[str, str]) -> Dict[str, str]:
"""过滤 LLM 返回的绰号映射结果。"""
filtered_data = {}
bot_qq_str = str(self.config.BOT_QQ) if hasattr(self.config, "BOT_QQ") else None
for user_id, nickname in original_data.items():
if not isinstance(user_id, str):
logger.warning(f"过滤掉非字符串 user_id: {user_id}")
continue
if bot_qq_str and user_id == bot_qq_str:
logger.debug(f"过滤掉机器人自身的映射: ID {user_id}")
continue
if not nickname or nickname.isspace():
logger.debug(f"过滤掉用户 {user_id} 的空绰号。")
continue
# person_name = user_name_map.get(user_id)
# if person_name and person_name == nickname:
# logger.debug(f"过滤掉用户 {user_id} 的映射: 绰号 '{nickname}' 与其名称 '{person_name}' 相同。")
# continue
filtered_data[user_id] = nickname.strip()
return filtered_data
# 线程相关
# 修改:使用 run_async_loop 辅助函数
def _run_processor_in_thread(self):
"""后台线程入口函数,使用辅助函数管理 asyncio 事件循环。"""
thread_id = threading.get_ident() # 获取线程ID用于日志
logger.info(f"绰号处理器线程启动 (线程 ID: {thread_id})...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # 为当前线程设置事件循环
logger.info(f"(线程 ID: {thread_id}) Asyncio 事件循环已创建并设置。")
# 调用辅助函数来运行主处理协程并管理循环生命周期
run_async_loop(loop, self._processing_loop())
logger.info(f"绰号处理器线程结束 (线程 ID: {thread_id}).")
# 结束修改
# 修改:使用 asyncio.Queue 和 wait_for
async def _processing_loop(self):
"""后台线程中运行的异步处理循环 (使用 asyncio.Queue)。"""
# 移除线程ID日志因为它在异步上下文中不一定准确
logger.info("绰号异步处理循环已启动。")
while not self._stop_event.is_set(): # 仍然检查同步的停止事件
try:
# 使用 asyncio.wait_for 从异步队列获取项目,并设置超时
item = await asyncio.wait_for(self.nickname_queue.get(), timeout=self.sleep_interval)
# 处理获取到的项目 (调用异步方法)
await self._analyze_and_update_nicknames(item)
self.nickname_queue.task_done() # 标记任务完成
except asyncio.TimeoutError:
# 等待超时,相当于之前 queue.Empty继续循环检查停止事件
continue
except asyncio.CancelledError:
# 协程被取消 (通常在 stop_processor 中发生)
logger.info("绰号处理循环被取消。")
break # 退出循环
except Exception as e:
# 捕获处理单个项目时可能发生的其他异常
logger.error(f"绰号处理循环出错: {e}", exc_info=True)
# 短暂异步休眠避免快速连续失败
await asyncio.sleep(5)
logger.info("绰号异步处理循环已结束。")
# 可以在这里添加清理逻辑,比如确保队列为空或处理剩余项目
# 例如await self.nickname_queue.join() # 等待所有任务完成 (如果需要)
# 结束修改
# 在模块级别创建单例实例
nickname_manager = NicknameManager()

View File

@ -0,0 +1,78 @@
# src/plugins/group_nickname/nickname_mapper.py
from typing import Dict
from src.common.logger_manager import get_logger
# 这个文件现在只负责构建 PromptLLM 的初始化和调用移至 NicknameManager
logger = get_logger("nickname_mapper")
# LLMRequest 实例和 analyze_chat_for_nicknames 函数已被移除
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
"""
构建用于 LLM 进行绰号映射分析的 Prompt
Args:
chat_history_str: 格式化后的聊天历史记录字符串
bot_reply: Bot 的最新回复字符串
user_name_map: 用户 ID 到已知名称person_name fallback nickname的映射
Returns:
str: 构建好的 Prompt 字符串
"""
# 将 user_name_map 格式化为列表字符串
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items() if uid and name])
if not user_list_str:
user_list_str = "" # 如果映射为空,明确告知
# 核心 Prompt 内容
prompt = f"""
任务仔细分析以下聊天记录和你的最新回复判断其中是否明确提到了某个用户的绰号并且这个绰号可以清晰地与一个特定的用户 ID 对应起来
已知用户信息ID: 名称
{user_list_str}
*注意名称后面带有"(你)"表示是你自己*
聊天记录
---
{chat_history_str}
---
你的最新回复
{bot_reply}
分析要求与输出格式
1. 找出聊天记录和你的最新回复中可能是用户绰号的词语
2. 判断这些绰号是否在上下文中**清晰无歧义**地指向了已知用户信息列表中的**某一个特定用户 ID**必须是强关联避免猜测
3. **不要**输出你自己名称后带"(你)"的用户的绰号映射
**不要**输出与用户已知名称完全相同的词语作为绰号
**不要**将在你的最新回复中你对他人使用的称呼或绰号进行映射只分析聊天记录中他人对用户的称呼
**不要**输出指代不明或过于通用的词语大佬兄弟那个谁除非上下文能非常明确地指向特定用户
4. 如果找到了**至少一个**满足上述所有条件的**明确**的用户 ID 到绰号的映射关系请输出 JSON 对象
```json
{{
"is_exist": true,
"data": {{
"用户A数字id": "绰号_A",
"用户B数字id": "绰号_B"
}}
}}
```
- `"data"` 字段的键必须是用户的**数字 ID (字符串形式)**值是对应的**绰号 (字符串形式)**
- 只包含你能**百分百确认**映射关系的条目宁缺毋滥
如果**无法找到任何一个**满足条件的明确映射关系请输出 JSON 对象
```json
{{
"is_exist": false
}}
```
5. ****输出 JSON 对象不要包含任何额外的解释注释或代码块标记之外的文本
输出
"""
# logger.debug(f"构建的绰号映射 Prompt (部分):\n{prompt[:500]}...") # 可以在 NicknameManager 中记录
return prompt
# analyze_chat_for_nicknames 函数已被移除,其逻辑移至 NicknameManager._call_llm_for_analysis

View File

@ -0,0 +1,175 @@
import random
from typing import List, Dict, Tuple
from src.common.logger_manager import get_logger
from src.config.config import global_config
# 这个文件现在只包含纯粹的工具函数,与状态和流程无关
logger = get_logger("nickname_utils")
def select_nicknames_for_prompt(all_nicknames_info: Dict[str, List[Dict[str, int]]]) -> List[Tuple[str, str, int]]:
"""
从给定的绰号信息中根据映射次数加权随机选择最多 N 个绰号用于 Prompt
Args:
all_nicknames_info: 包含用户及其绰号信息的字典格式为
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
注意这里的用户名是 person_name
Returns:
List[Tuple[str, str, int]]: 选中的绰号列表每个元素为 (用户名, 绰号, 次数)
按次数降序排序
"""
if not all_nicknames_info:
return []
candidates = [] # 存储 (用户名, 绰号, 次数, 权重)
smoothing_factor = getattr(global_config, "NICKNAME_PROBABILITY_SMOOTHING", 1.0) # 平滑因子避免权重为0
for user_name, nicknames in all_nicknames_info.items():
if nicknames and isinstance(nicknames, list):
for nickname_entry in nicknames:
# 确保条目是字典且只有一个键值对
if isinstance(nickname_entry, dict) and len(nickname_entry) == 1:
nickname, count = list(nickname_entry.items())[0]
# 确保次数是正整数
if isinstance(count, int) and count > 0 and isinstance(nickname, str) and nickname:
weight = count + smoothing_factor # 计算权重
candidates.append((user_name, nickname, count, weight))
else:
logger.warning(
f"用户 '{user_name}' 的绰号条目无效: {nickname_entry} (次数非正整数或绰号为空)。已跳过。"
)
else:
logger.warning(f"用户 '{user_name}' 的绰号条目格式无效: {nickname_entry}。已跳过。")
if not candidates:
return []
# 确定需要选择的数量
max_nicknames = getattr(global_config, "MAX_NICKNAMES_IN_PROMPT", 5)
num_to_select = min(max_nicknames, len(candidates))
try:
# 调用加权随机抽样(不重复)
selected_candidates_with_weight = weighted_sample_without_replacement(candidates, num_to_select)
# 如果抽样结果数量不足(例如权重问题导致提前退出),可以考虑是否需要补充
if len(selected_candidates_with_weight) < num_to_select:
logger.debug(
f"加权随机选择后数量不足 ({len(selected_candidates_with_weight)}/{num_to_select}),尝试补充选择次数最多的。"
)
# 筛选出未被选中的候选
selected_ids = set(
(c[0], c[1]) for c in selected_candidates_with_weight
) # 使用 (用户名, 绰号) 作为唯一标识
remaining_candidates = [c for c in candidates if (c[0], c[1]) not in selected_ids]
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
needed = num_to_select - len(selected_candidates_with_weight)
selected_candidates_with_weight.extend(remaining_candidates[:needed])
except Exception as e:
# 日志:记录加权随机选择时发生的错误,并回退到简单选择
logger.error(f"绰号加权随机选择时出错: {e}。将回退到选择次数最多的 Top N。", exc_info=True)
# 出错时回退到选择次数最多的 N 个
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
selected_candidates_with_weight = candidates[:num_to_select]
# 格式化输出结果为 (用户名, 绰号, 次数),移除权重
result = [(user, nick, count) for user, nick, count, _weight in selected_candidates_with_weight]
# 按次数降序排序最终结果
result.sort(key=lambda x: x[2], reverse=True)
logger.debug(f"为 Prompt 选择的绰号: {result}")
return result
def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str:
"""
将选中的绰号信息格式化为注入 Prompt 的字符串
Args:
selected_nicknames: 选中的绰号列表 (用户名, 绰号, 次数)
Returns:
str: 格式化后的字符串如果列表为空则返回空字符串
"""
if not selected_nicknames:
return ""
# Prompt 注入部分的标题
prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序),供你参考:"]
grouped_by_user: Dict[str, List[str]] = {} # 用于按用户分组
# 按用户分组绰号
for user_name, nickname, _count in selected_nicknames:
if user_name not in grouped_by_user:
grouped_by_user[user_name] = []
# 添加中文引号以区分绰号
grouped_by_user[user_name].append(f"{nickname}")
# 构建每个用户的绰号字符串
for user_name, nicknames in grouped_by_user.items():
nicknames_str = "".join(nicknames) # 使用中文顿号连接
# 格式化输出,例如: "- 张三ta 可能被称为:“三儿”、“张哥”"
prompt_lines.append(f"- {user_name}ta 可能被称为:{nicknames_str}")
# 如果只有标题行,返回空字符串,避免注入无意义的标题
if len(prompt_lines) > 1:
# 末尾加换行符,以便在 Prompt 中正确分隔
return "\n".join(prompt_lines) + "\n"
else:
return ""
def weighted_sample_without_replacement(
candidates: List[Tuple[str, str, int, float]], k: int
) -> List[Tuple[str, str, int, float]]:
"""
执行不重复的加权随机抽样使用 A-ExpJ 算法思想的简化实现
Args:
candidates: 候选列表每个元素为 (用户名, 绰号, 次数, 权重)
k: 需要选择的数量
Returns:
List[Tuple[str, str, int, float]]: 选中的元素列表包含权重
"""
if k <= 0:
return []
n = len(candidates)
if k >= n:
return candidates[:] # 返回副本
# 计算每个元素的 key = U^(1/weight),其中 U 是 (0, 1) 之间的随机数
# 为了数值稳定性,计算 log(key) = log(U) / weight
# log(U) 可以用 -Exponential(1) 来生成
weighted_keys = []
for i in range(n):
weight = candidates[i][3]
if weight <= 0:
# 处理权重为0或负数的情况赋予一个极小的概率或极大负数的log_key
log_key = float("-inf") # 或者一个非常大的负数
logger.warning(f"候选者 {candidates[i][:2]} 的权重为非正数 ({weight}),抽中概率极低。")
else:
log_u = -random.expovariate(1.0) # 生成 -Exponential(1) 随机数
log_key = log_u / weight
weighted_keys.append((log_key, i)) # 存储 (log_key, 原始索引)
# 按 log_key 降序排序 (相当于按 key 升序排序)
weighted_keys.sort(key=lambda x: x[0], reverse=True)
# 选择 log_key 最大的 k 个元素的原始索引
selected_indices = [index for _log_key, index in weighted_keys[:k]]
# 根据选中的索引从原始 candidates 列表中获取元素
selected_items = [candidates[i] for i in selected_indices]
return selected_items
# 移除旧的流程函数
# get_nickname_injection_for_prompt 和 trigger_nickname_analysis_if_needed
# 的逻辑现在由 NicknameManager 处理

View File

@ -1,20 +1,20 @@
import asyncio
import time
import traceback
import random # <--- 添加导入
import json # <--- 确保导入 json
import random
import json
from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine
from collections import deque
from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from src.plugins.chat.message import Seg # Local import needed after move
from src.plugins.chat.message import Seg
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import UserInfo
from src.plugins.chat.chat_stream import chat_manager
from src.common.logger_manager import get_logger
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.plugins.chat.utils_image import image_path_to_base64 # Local import needed after move
from src.plugins.utils.timer_calculator import Timer # <--- Import Timer
from src.plugins.chat.utils_image import image_path_to_base64
from src.plugins.utils.timer_calculator import Timer
from src.plugins.emoji_system.emoji_manager import emoji_manager
from src.heart_flow.sub_mind import SubMind
from src.heart_flow.observation import Observation
@ -28,6 +28,8 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.moods.moods import MoodManager
from src.heart_flow.utils_chat import get_chat_type_and_target_info
from rich.traceback import install
from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.plugins.group_nickname.nickname_manager import nickname_manager
install(extra_lines=3)
@ -528,11 +530,17 @@ class HeartFChatting:
try:
if action == "text_reply":
return await handler(reasoning, emoji_query, cycle_timers)
# 调用文本回复处理,它会返回 (bool, thinking_id)
success, thinking_id = await handler(reasoning, emoji_query, cycle_timers)
return success, thinking_id # 直接返回结果
elif action == "emoji_reply":
return await handler(reasoning, emoji_query), ""
# 调用表情回复处理,它只返回 bool
success = await handler(reasoning, emoji_query)
return success, "" # thinking_id 为空字符串
else: # no_reply
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
# 调用不回复处理,它只返回 bool
success = await handler(reasoning, planner_start_db_time, cycle_timers)
return success, "" # thinking_id 为空字符串
except HeartFCError as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
# 出错时也重置计数器
@ -549,6 +557,7 @@ class HeartFChatting:
2. 创建思考消息
3. 生成回复
4. 发送消息
5. [新增] 触发绰号分析
参数:
reasoning: 回复原因
@ -572,6 +581,7 @@ class HeartFChatting:
if not thinking_id:
raise PlannerError("无法创建思考消息")
reply = None # 初始化 reply
try:
# 生成回复
with Timer("生成回复", cycle_timers):
@ -585,7 +595,6 @@ class HeartFChatting:
raise ReplierError("回复生成失败")
# 发送消息
with Timer("发送消息", cycle_timers):
await self._sender(
thinking_id=thinking_id,
@ -594,6 +603,9 @@ class HeartFChatting:
send_emoji=emoji_query,
)
# 调用工具函数触发绰号分析
await nickname_manager.trigger_nickname_analysis(anchor_message, reply, self.chat_stream)
return True, thinking_id
except (ReplierError, SenderError) as e:
@ -854,6 +866,17 @@ class HeartFChatting:
f"{self.log_prefix}[Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}"
)
# 需要获取用于上下文的历史消息
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(), # 使用当前时间作为参考点
limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit
)
# 调用工具函数获取格式化后的绰号字符串
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(
self.chat_stream, message_list_before_now
)
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
prompt = await prompt_builder.build_planner_prompt(
is_group_chat=self.is_group_chat, # <-- Pass HFC state
@ -863,6 +886,7 @@ class HeartFChatting:
current_mind=current_mind, # <-- Pass argument
structured_info=self.sub_mind.structured_info_str, # <-- Pass SubMind info
current_available_actions=current_available_actions, # <-- Pass determined actions
nickname_info=nickname_injection_str,
)
# --- 调用 LLM (普通文本生成) ---

View File

@ -1,4 +1,6 @@
import random
import time
from typing import Union, Optional, Deque, Dict, Any
from ...config.config import global_config
from src.common.logger_manager import get_logger
from ...individuality.individuality import Individuality
@ -6,14 +8,13 @@ from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.chat.utils import get_embedding
import time
from typing import Union, Optional, Deque, Dict, Any
from ...common.database import db
from ..chat.utils import get_recent_group_speaker
from ..moods.moods import MoodManager
from ..memory_system.Hippocampus import HippocampusManager
from ..schedule.schedule_generator import bot_schedule
from ..knowledge.knowledge_lib import qa_manager
from src.plugins.group_nickname.nickname_manager import nickname_manager
import traceback
from .heartFC_Cycleinfo import CycleInfo
@ -24,6 +25,7 @@ def init_prompt():
Prompt(
"""
{info_from_tools}
{nickname_info}
{chat_target}
{chat_talking_prompt}
现在你想要在群里发言或者回复\n
@ -53,6 +55,7 @@ def init_prompt():
Prompt(
"""你的名字是{bot_name},{prompt_personality}{chat_context_description}。需要基于以下信息决定如何参与对话:
{structured_info_block}
{nickname_info}
{chat_content_block}
{current_mind_block}
{cycle_info_block}
@ -118,6 +121,7 @@ JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query":
{relation_prompt}
{prompt_info}
{schedule_prompt}
{nickname_info}
{chat_target}
{chat_talking_prompt}
现在"{sender_name}"说的:{message_txt}引起了你的注意你想要在群里发言或者回复这条消息\n
@ -250,9 +254,15 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s
chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1")
chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2")
# 调用新的工具函数获取绰号信息
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(
chat_stream, message_list_before_now
)
prompt = await global_prompt_manager.format_prompt(
template_name,
info_from_tools=structured_info_prompt,
nickname_info=nickname_injection_str,
chat_target=chat_target_1, # Used in group template
chat_talking_prompt=chat_talking_prompt,
bot_name=global_config.BOT_NICKNAME,
@ -442,6 +452,11 @@ class PromptBuilder:
chat_target_1 = await global_prompt_manager.get_prompt_async("chat_target_group1")
chat_target_2 = await global_prompt_manager.get_prompt_async("chat_target_group2")
# 调用新的工具函数获取绰号信息
nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(
chat_stream, message_list_before_now
)
prompt = await global_prompt_manager.format_prompt(
template_name,
relation_prompt=relation_prompt,
@ -449,6 +464,7 @@ class PromptBuilder:
memory_prompt=memory_prompt,
prompt_info=prompt_info,
schedule_prompt=schedule_prompt,
nickname_info=nickname_injection_str, # <--- 注入绰号信息
chat_target=chat_target_1,
chat_target_2=chat_target_2,
chat_talking_prompt=chat_talking_prompt,
@ -486,7 +502,7 @@ class PromptBuilder:
prompt_ger=prompt_ger,
moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
)
# --- End choosing template ---
# --- End choosing template ---
return prompt
@ -755,6 +771,7 @@ class PromptBuilder:
current_mind: Optional[str],
structured_info: Dict[str, Any],
current_available_actions: Dict[str, str],
nickname_info: str,
# replan_prompt: str, # Replan logic still simplified
) -> str:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
@ -836,6 +853,7 @@ class PromptBuilder:
prompt = planner_prompt_template.format(
bot_name=global_config.BOT_NICKNAME,
nickname_info=nickname_info,
prompt_personality=prompt_personality,
chat_context_description=chat_context_description,
structured_info_block=structured_info_block,

View File

@ -20,6 +20,7 @@ from src.plugins.person_info.relationship_manager import relationship_manager
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.utils.timer_calculator import Timer
from src.heart_flow.utils_chat import get_chat_type_and_target_info
from src.plugins.group_nickname.nickname_manager import nickname_manager
logger = get_logger("chat")
@ -316,6 +317,7 @@ class NormalChat:
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg:
info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg)
await nickname_manager.trigger_nickname_analysis(message, response_set, self.chat_stream)
else:
logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher")

View File

@ -53,6 +53,7 @@ person_info_default = {
"msg_interval_list": [],
"user_cardname": None, # 添加群名片
"user_avatar": None, # 添加头像信息例如URL或标识符
"group_nicknames": [],
} # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项

View File

@ -5,6 +5,8 @@ from bson.decimal128 import Decimal128
from .person_info import person_info_manager
import time
import random
from typing import List, Dict
from ...common.database import db
from maim_message import UserInfo
# import re
# import traceback
@ -81,6 +83,131 @@ class RelationshipManager:
is_known = person_info_manager.is_person_known(platform, user_id)
return is_known
# --- [修改] 使用全局 db 对象进行查询 ---
@staticmethod
async def get_person_names_batch(platform: str, user_ids: List[str]) -> Dict[str, str]:
"""
批量获取多个用户的 person_name
"""
if not user_ids:
return {}
person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids]
names_map = {}
try:
cursor = db.person_info.find(
{"person_id": {"$in": person_ids}},
{"_id": 0, "person_id": 1, "user_id": 1, "person_name": 1}, # 只查询需要的字段
)
for doc in cursor:
user_id_val = doc.get("user_id") # 获取原始值
original_user_id = None # 初始化
if isinstance(user_id_val, (int, float)): # 检查是否是数字类型
original_user_id = str(user_id_val) # 直接转换为字符串
elif isinstance(user_id_val, str): # 检查是否是字符串
if "_" in user_id_val: # 如果包含下划线,则分割
original_user_id = user_id_val.split("_", 1)[-1]
else: # 如果不包含下划线,则直接使用该字符串
original_user_id = user_id_val
# else: # 其他类型或 Noneoriginal_user_id 保持为 None
person_name = doc.get("person_name")
# 确保 original_user_id 和 person_name 都有效
if original_user_id and person_name:
names_map[original_user_id] = person_name
logger.debug(f"批量获取 {len(user_ids)} 个用户的 person_name找到 {len(names_map)} 个。")
except AttributeError as e:
# 如果 db 对象没有 person_info 属性,或者 find 方法不存在
logger.error(f"访问数据库时出错: {e}。请检查 common/database.py 和集合名称。")
except Exception as e:
logger.error(f"批量获取 person_name 时出错: {e}", exc_info=True)
return names_map
@staticmethod
async def get_users_group_nicknames(
platform: str, user_ids: List[str], group_id: str
) -> Dict[str, List[Dict[str, int]]]:
"""
批量获取多个用户在指定群组的绰号信息
Args:
platform (str): 平台名称
user_ids (List[str]): 用户 ID 列表
group_id (str): 群组 ID
Returns:
Dict[str, List[Dict[str, int]]]: 映射 {person_name: [{"绰号A": 次数}, ...]}
"""
if not user_ids or not group_id:
return {}
person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids]
nicknames_data = {}
group_id_str = str(group_id) # 确保 group_id 是字符串
try:
# 查询包含目标 person_id 的文档
cursor = db.person_info.find(
{"person_id": {"$in": person_ids}},
{"_id": 0, "person_id": 1, "person_name": 1, "group_nicknames": 1}, # 查询所需字段
)
# 假设同步迭代可行
for doc in cursor:
person_name = doc.get("person_name")
if not person_name:
continue # 跳过没有 person_name 的用户
group_nicknames_list = doc.get("group_nicknames", []) # 获取 group_nicknames 数组
target_group_nicknames = [] # 存储目标群组的绰号列表
# 遍历 group_nicknames 数组,查找匹配的 group_id
for group_entry in group_nicknames_list:
# 确保 group_entry 是字典且包含 group_id 键
if isinstance(group_entry, dict) and group_entry.get("group_id") == group_id_str:
# 提取 nicknames 列表
nicknames_raw = group_entry.get("nicknames", [])
if isinstance(nicknames_raw, list):
target_group_nicknames = nicknames_raw
break # 找到匹配的 group_id 后即可退出内层循环
# 如果找到了目标群组的绰号列表
if target_group_nicknames:
valid_nicknames_formatted = [] # 存储格式化后的绰号
for item in target_group_nicknames:
# 校验每个绰号条目的格式 { "name": str, "count": int }
if (
isinstance(item, dict)
and isinstance(item.get("name"), str)
and isinstance(item.get("count"), int)
and item["count"] > 0
): # 确保 count 是正整数
# --- 格式转换:从 { "name": "xxx", "count": y } 转为 { "xxx": y } ---
valid_nicknames_formatted.append({item["name"]: item["count"]})
# --- 结束格式转换 ---
else:
logger.warning(
f"数据库中用户 {person_name} 群组 {group_id_str} 的绰号格式无效或 count <= 0: {item}"
)
if valid_nicknames_formatted: # 如果存在有效的、格式化后的绰号
nicknames_data[person_name] = valid_nicknames_formatted # 使用 person_name 作为 key
logger.debug(
f"批量获取群组 {group_id_str}{len(user_ids)} 个用户的绰号,找到 {len(nicknames_data)} 个用户的数据。"
)
except AttributeError as e:
logger.error(f"访问数据库时出错: {e}。请检查 common/database.py 和集合名称 'person_info'")
except Exception as e:
logger.error(f"批量获取群组绰号时出错: {e}", exc_info=True)
return nicknames_data
@staticmethod
async def is_qved_name(platform, user_id):
"""判断是否认识某人"""

View File

@ -1,5 +1,5 @@
[inner]
version = "1.6.1"
version = "1.6.2"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请在修改后将version的值进行变更
@ -123,6 +123,13 @@ steal_emoji = true # 是否偷取表情包,让麦麦可以发送她保存的
enable_check = false # 是否启用表情包过滤,只有符合该要求的表情包才会被保存
check_prompt = "符合公序良俗" # 表情包过滤要求,只有符合该要求的表情包才会被保存
[group_nickname]
enable_nickname_mapping = false # 绰号映射功能总开关(默认关闭,建议关闭)
max_nicknames_in_prompt = 10 # Prompt 中最多注入的绰号数量防止token数量爆炸
nickname_probability_smoothing = 1 # 绰号加权随机选择的平滑因子
nickname_queue_max_size = 100 # 绰号处理队列最大容量
nickname_process_sleep_interval = 60 # 绰号处理进程休眠间隔(秒)
[memory]
build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多
build_memory_distribution = [6.0,3.0,0.6,32.0,12.0,0.4] # 记忆构建分布参数分布1均值标准差权重分布2均值标准差权重
@ -291,6 +298,13 @@ provider = "SILICONFLOW"
pri_in = 2
pri_out = 8
#绰号映射生成模型
[model.llm_nickname_mapping]
name = "deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
temp = 0.3
pri_in = 2
pri_out = 8
#以下模型暂时没有使用!!
#以下模型暂时没有使用!!