false状态下不报错了,先存个档

pull/914/head
Bakadax 2025-04-30 16:57:31 +08:00
parent 14b63958e4
commit 5c2dd25ba4
8 changed files with 221 additions and 295 deletions

View File

@ -1,7 +1,7 @@
import os
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple, Any
from dateutil import tz
import tomli
@ -272,6 +272,13 @@ class BotConfig:
# enable_think_flow: bool = False # 是否启用思考流程
enable_pfc_chatting: bool = False # 是否启用PFC聊天
# Group Nickname
ENABLE_NICKNAME_MAPPING: bool = False # 绰号映射功能总开关
MAX_NICKNAMES_IN_PROMPT: int = 10 # Prompt 中最多注入的绰号数量
NICKNAME_PROBABILITY_SMOOTHING: int = 1 # 绰号加权随机选择的平滑因子
NICKNAME_QUEUE_MAX_SIZE: int = 100 # 绰号处理队列最大容量
NICKNAME_PROCESS_SLEEP_INTERVAL: float = 0.5 # 绰号处理进程休眠间隔(秒)
# 模型配置
llm_reasoning: Dict[str, str] = field(default_factory=lambda: {})
# llm_reasoning_minor: Dict[str, str] = field(default_factory=lambda: {})
@ -288,6 +295,7 @@ class BotConfig:
llm_heartflow: Dict[str, str] = field(default_factory=lambda: {})
llm_tool_use: Dict[str, str] = field(default_factory=lambda: {})
llm_plan: Dict[str, str] = field(default_factory=lambda: {})
llm_nickname_mapping: Dict[str, Any] = field(default_factory=dict)
api_urls: Dict[str, str] = field(default_factory=lambda: {})
@ -401,6 +409,14 @@ class BotConfig:
config.save_emoji = emoji_config.get("save_emoji", config.save_emoji)
config.steal_emoji = emoji_config.get("steal_emoji", config.steal_emoji)
def group_nickname(parent: dict):
gn_config = parent.get("group_nickname", {})
config.ENABLE_NICKNAME_MAPPING = gn_config.get("enable_nickname_mapping", config.ENABLE_NICKNAME_MAPPING)
config.MAX_NICKNAMES_IN_PROMPT = gn_config.get("max_nicknames_in_prompt", config.MAX_NICKNAMES_IN_PROMPT)
config.NICKNAME_PROBABILITY_SMOOTHING = gn_config.get("nickname_probability_smoothing", config.NICKNAME_PROBABILITY_SMOOTHING)
config.NICKNAME_QUEUE_MAX_SIZE = gn_config.get("nickname_queue_max_size", config.NICKNAME_QUEUE_MAX_SIZE)
config.NICKNAME_PROCESS_SLEEP_INTERVAL = gn_config.get("nickname_process_sleep_interval", config.NICKNAME_PROCESS_SLEEP_INTERVAL)
def bot(parent: dict):
# 机器人基础配置
bot_config = parent["bot"]
@ -488,6 +504,7 @@ class BotConfig:
"llm_PFC_action_planner",
"llm_PFC_chat",
"llm_PFC_reply_checker",
"llm_nickname_mapping",
]
for item in config_list:
@ -709,6 +726,7 @@ class BotConfig:
"response_splitter": {"func": response_splitter, "support": ">=0.0.11", "necessary": False},
"experimental": {"func": experimental, "support": ">=0.0.11", "necessary": False},
"heartflow": {"func": heartflow, "support": ">=1.0.2", "necessary": False},
"group_nickname": {"func": group_nickname, "support": ">=0.6.3", "necessary": False},
}
# 原地修改,将 字符串版本表达式 转换成 版本对象

View File

@ -1,39 +0,0 @@
import threading
# 功能总开关
ENABLE_NICKNAME_MAPPING = False # 设置为 False 可完全禁用此功能
# --- LLM 相关配置 (示例,你需要根据实际情况修改) ---
# 用于绰号映射分析的 LLM 模型配置
LLM_MODEL_NICKNAME_MAPPING = {
"model_name": "your_llm_model_for_mapping", # 替换成你用于分析的模型名称
"api_key": "YOUR_API_KEY", # 如果需要
"base_url": "YOUR_API_BASE", # 如果需要
"temperature": 0.5,
"max_tokens": 200,
}
# --- 数据库相关配置 (如果需要独立配置) ---
# 例如,如果数据库连接信息不同或需要特定集合名称
DB_COLLECTION_PERSON_INFO = "person_info" # 你的用户信息集合名称
# --- Prompt 注入配置 ---
MAX_NICKNAMES_IN_PROMPT = 10 # Prompt 中最多注入的绰号数量
NICKNAME_PROBABILITY_SMOOTHING = 1 # 用于加权随机选择的平滑因子 (防止概率为0)
# --- 进程控制 ---
NICKNAME_QUEUE_MAX_SIZE = 100 # 进程间通信队列的最大容量
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5 # 映射进程在队列为空时的休眠时间(秒)
# --- 运行时状态 (用于安全停止进程) ---
_stop_event = threading.Event()
def get_stop_event():
"""获取全局停止事件"""
return _stop_event
def set_stop_event():
"""设置全局停止事件,通知子进程退出"""
_stop_event.set()

View File

@ -1,41 +1,42 @@
# GroupNickname/nickname_mapper.py
import json
from typing import Dict, Any, Tuple, List, Optional
from src.common.logger_manager import get_logger # 假设你的日志管理器路径
from src.plugins.models.utils_model import LLMRequest # 假设你的 LLM 请求工具路径
from .config import LLM_MODEL_NICKNAME_MAPPING, ENABLE_NICKNAME_MAPPING
from typing import Dict, Any, Optional
from src.common.logger_manager import get_logger
from src.plugins.models.utils_model import LLMRequest
# 从全局配置导入
from src.config.config import global_config
logger = get_logger("nickname_mapper")
# 初始化用于绰号映射的 LLM 实例
# 注意:这里的初始化方式可能需要根据你的 LLMRequest 实现进行调整
try:
# 尝试使用字典解包来传递参数
llm_mapper = LLMRequest(
model=LLM_MODEL_NICKNAME_MAPPING.get("model_name", "default_model"),
temperature=LLM_MODEL_NICKNAME_MAPPING.get("temperature", 0.5),
max_tokens=LLM_MODEL_NICKNAME_MAPPING.get("max_tokens", 200),
api_key=LLM_MODEL_NICKNAME_MAPPING.get("api_key"),
base_url=LLM_MODEL_NICKNAME_MAPPING.get("base_url"),
request_type="nickname_mapping" # 定义一个请求类型用于区分
)
logger.info("Nickname mapping LLM initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize nickname mapping LLM: {e}", exc_info=True)
llm_mapper = None # 初始化失败则置为 None
llm_mapper: Optional[LLMRequest] = None
if global_config.ENABLE_NICKNAME_MAPPING: # 使用全局开关
try:
# 从全局配置获取模型设置
model_config = global_config.llm_nickname_mapping
if not model_config or not model_config.get("name"):
logger.error("在全局配置中未找到有效的 'llm_nickname_mapping' 配置或缺少 'name' 字段。")
else:
llm_args = {
"model": model_config.get("name"), # 必须有 name
"temperature": model_config.get("temp", 0.5), # 使用 temp 字段
"max_tokens": model_config.get("max_tokens", 200), # max_tokens 是可选的,取决于 LLMRequest 实现
"api_key": model_config.get("key"), # 使用 key 字段
"base_url": model_config.get("base_url"), # 使用 base_url 字段
"request_type": "nickname_mapping"
}
# 清理 None 值参数
llm_args = {k: v for k, v in llm_args.items() if v is not None}
llm_mapper = LLMRequest(**llm_args)
logger.info("绰号映射 LLM 初始化成功 (使用全局配置)。")
except Exception as e:
logger.error(f"使用全局配置初始化绰号映射 LLM 失败: {e}", exc_info=True)
llm_mapper = None
# --- 结束修改 ---
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
"""
构建用于 LLM 分析绰号映射的 Prompt
Args:
chat_history_str: 格式化后的聊天记录字符串
bot_reply: Bot 的回复内容
user_name_map: 用户 ID 到已知名称 person_name nickname的映射
Returns:
str: 构建好的 Prompt
"""
# ... (函数内容不变) ...
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
prompt = f"""
@ -74,6 +75,7 @@ Bot 最新回复:
"""
return prompt
async def analyze_chat_for_nicknames(
chat_history_str: str,
bot_reply: str,
@ -81,47 +83,30 @@ async def analyze_chat_for_nicknames(
) -> Dict[str, Any]:
"""
调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射
Args:
chat_history_str: 格式化后的聊天记录字符串
bot_reply: Bot 的回复内容
user_name_map: 用户 ID 到已知名称 person_name nickname的映射
Returns:
Dict[str, Any]: 分析结果格式为 { "is_exist": bool, "data": Optional[Dict[str, str]] }
如果出错返回 {"is_exist": False}
"""
if not ENABLE_NICKNAME_MAPPING:
logger.debug("Nickname mapping feature is disabled.")
# --- [修改] 使用全局配置开关 ---
if not global_config.ENABLE_NICKNAME_MAPPING:
# --- 结束修改 ---
logger.debug("绰号映射功能已禁用。")
return {"is_exist": False}
if llm_mapper is None:
logger.error("Nickname mapping LLM is not initialized. Cannot perform analysis.")
logger.error("绰号映射 LLM 未初始化。无法执行分析。")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"Nickname mapping prompt built:\n{prompt}") # 调试日志
logger.debug(f"构建的绰号映射 Prompt:\n{prompt}")
try:
# --- 调用 LLM ---
# 注意:这里的调用方式需要根据你的 LLMRequest 实现进行调整
# 可能需要使用 generate_response_sync 或其他同步方法,因为这将在独立进程中运行
# 或者如果 LLMRequest 支持异步,确保在异步环境中调用
# response_content, _, _ = await llm_mapper.generate_response(prompt)
# 假设 llm_mapper 有一个同步的 generate 方法或在异步环境中调用
# 这里暂时使用 await如果你的 LLMRequest 不支持,需要修改
# 调用 LLM
response_content, _, _ = await llm_mapper.generate_response(prompt)
logger.debug(f"LLM 原始响应 (绰号映射): {response_content}")
logger.debug(f"LLM raw response for nickname mapping: {response_content}")
# --- 解析 LLM 响应 ---
# ... (解析 LLM 响应的逻辑不变) ...
if not response_content:
logger.warning("LLM returned empty content for nickname mapping.")
logger.warning("LLM 返回了空的绰号映射内容。")
return {"is_exist": False}
# 尝试去除可能的代码块标记
response_content = response_content.strip()
if response_content.startswith("```json"):
response_content = response_content[7:]
@ -131,33 +116,31 @@ async def analyze_chat_for_nicknames(
try:
result = json.loads(response_content)
# 基本验证
if isinstance(result, dict) and "is_exist" in result:
if result["is_exist"] is True:
if "data" in result and isinstance(result["data"], dict):
# 过滤掉 data 为空的情况
if not result["data"]:
logger.debug("LLM indicated is_exist=True but data is empty. Treating as False.")
logger.debug("LLM 指示 is_exist=True 但 data 为空。视为 False 处理。")
return {"is_exist": False}
logger.info(f"Nickname mapping found: {result['data']}")
logger.info(f"找到绰号映射: {result['data']}")
return {"is_exist": True, "data": result["data"]}
else:
logger.warning("LLM response format error: is_exist is True but 'data' is missing or not a dict.")
logger.warning("LLM 响应格式错误: is_exist 为 True 但 'data' 缺失或不是字典。")
return {"is_exist": False}
elif result["is_exist"] is False:
logger.info("No reliable nickname mapping found by LLM.")
logger.info("LLM 未找到可靠的绰号映射。")
return {"is_exist": False}
else:
logger.warning("LLM response format error: 'is_exist' is not a boolean.")
logger.warning("LLM 响应格式错误: 'is_exist' 不是布尔值。")
return {"is_exist": False}
else:
logger.warning("LLM response format error: Missing 'is_exist' key or not a dict.")
logger.warning("LLM 响应格式错误: 缺少 'is_exist' 键或不是字典。")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"Failed to parse LLM response as JSON: {json_err}\nRaw response: {response_content}")
logger.error(f"解析 LLM 响应 JSON 失败: {json_err}\n原始响应: {response_content}")
return {"is_exist": False}
except Exception as e:
logger.error(f"Error during nickname mapping LLM call or processing: {e}", exc_info=True)
logger.error(f"绰号映射 LLM 调用或处理过程中出错: {e}", exc_info=True)
return {"is_exist": False}

View File

@ -1,134 +1,70 @@
# GroupNickname/nickname_processor.py
import asyncio
import time
import threading
import traceback
# 明确导入 Event 和 Queue
from multiprocessing import Process, Queue as mpQueue
# 尝试从 synchronize 导入 Event
from multiprocessing.synchronize import Event as mpEvent
from typing import Dict, Any, Tuple, Optional, List
from multiprocessing.synchronize import Event as mpEvent # 从 synchronize 导入 Event
from typing import Dict, Optional
from pymongo import MongoClient, UpdateOne
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
# 假设你的项目结构允许这样导入
try:
from src.common.logger_manager import get_logger
from src.config.config import global_config
from .config import (
ENABLE_NICKNAME_MAPPING, DB_COLLECTION_PERSON_INFO,
NICKNAME_QUEUE_MAX_SIZE, NICKNAME_PROCESS_SLEEP_INTERVAL,
get_stop_event, set_stop_event
)
from .nickname_mapper import analyze_chat_for_nicknames
except ImportError:
# 提供备选导入路径或记录错误,以便调试
print("Error: Failed to import necessary modules. Please check your project structure and PYTHONPATH.")
# 在无法导入时,定义临时的 get_logger 以避免 NameError但这只是权宜之计
import logging
def get_logger(name):
return logging.getLogger(name)
# 定义临时的全局配置,这同样是权宜之计
class MockGlobalConfig:
mongodb_uri = "mongodb://localhost:27017/" # 示例 URI
mongodb_database = "your_db_name" # 示例数据库名
global_config = MockGlobalConfig()
# 定义临时的配置变量
ENABLE_NICKNAME_MAPPING = True
DB_COLLECTION_PERSON_INFO = "person_info"
NICKNAME_QUEUE_MAX_SIZE = 100
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5
# 使用导入的 mpEvent
_stop_event_internal = mpEvent()
def get_stop_event(): return _stop_event_internal
def set_stop_event(): _stop_event_internal.set()
# 定义临时的 analyze_chat_for_nicknames
async def analyze_chat_for_nicknames(*args, **kwargs): return {"is_exist": False}
from src.common.logger_manager import get_logger # 导入日志管理器
from src.config.config import global_config # 导入全局配置
from .nickname_mapper import analyze_chat_for_nicknames # 导入绰号分析函数
from src.common.database import db # 导入数据库初始化和关闭函数
logger = get_logger("nickname_processor")
logger = get_logger("nickname_processor") # 获取日志记录器实例
# --- 运行时状态 (用于安全停止进程) ---
_stop_event = threading.Event()
# --- 数据库连接 ---
mongo_client: Optional[MongoClient] = None
person_info_collection = None
def _initialize_db():
"""初始化数据库连接(在子进程中调用)"""
global mongo_client, person_info_collection
if mongo_client is None:
try:
mongo_uri = global_config.mongodb_uri
if not mongo_uri:
raise ValueError("MongoDB URI not found in global config.")
mongo_client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
mongo_client.admin.command('ping')
db = mongo_client[global_config.mongodb_database]
person_info_collection = db[DB_COLLECTION_PERSON_INFO]
logger.info("Nickname processor: Database connection initialized successfully.")
except (ConnectionFailure, ValueError, OperationFailure) as e:
logger.error(f"Nickname processor: Failed to initialize database connection: {e}", exc_info=True)
mongo_client = None
person_info_collection = None
except Exception as e:
logger.error(f"Nickname processor: An unexpected error occurred during DB initialization: {e}", exc_info=True)
mongo_client = None
person_info_collection = None
def _close_db():
"""关闭数据库连接"""
global mongo_client
if mongo_client:
try:
mongo_client.close()
logger.info("Nickname processor: Database connection closed.")
except Exception as e:
logger.error(f"Nickname processor: Error closing database connection: {e}", exc_info=True)
finally:
mongo_client = None
mongo_client: Optional[MongoClient] = None # MongoDB 客户端实例
person_info_collection = None # 用户信息集合对象
# --- 数据库更新逻辑 ---
async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数
Args:
group_id (str): 群组 ID
nickname_map (Dict[str, str]): 需要更新的 {用户ID: 绰号} 映射
"""
if not person_info_collection:
logger.error("Database collection is not initialized. Cannot update nickname counts.")
# --- [修改] 使用导入的 db 对象访问集合 ---
# !!! 重要:请确保 'person_info' 是你实际存储用户信息的集合名称 !!!
person_info_collection = db.person_info
# --- 结束修改 ---
if not person_info_collection: # 理论上 db 对象总是可用,但保留检查
logger.error("无法访问数据库集合 'person_info'。无法更新绰号计数。")
return
if not nickname_map:
logger.debug("Empty nickname map provided for update.")
logger.debug("提供的用于更新的绰号映射为空。")
return
logger.info(f"Attempting to update nickname counts for group '{group_id}' with map: {nickname_map}")
logger.info(f"尝试更新群组 '{group_id}' 的绰号计数,映射为: {nickname_map}")
for user_id, nickname in nickname_map.items():
if not user_id or not nickname:
logger.warning(f"Skipping invalid entry in nickname map: user_id='{user_id}', nickname='{nickname}'")
logger.warning(f"跳过绰号映射中的无效条目: user_id='{user_id}', nickname='{nickname}'")
continue
group_id_str = str(group_id) # 确保是字符串
group_id_str = str(group_id)
try:
# a. 确保用户文档存在 group_nickname 字段且为 list
person_info_collection.update_one(
{"person_id": user_id},
{"$setOnInsert": {"group_nickname": []}}, # 如果字段不存在则创建为空列表
{"$setOnInsert": {"group_nickname": []}},
upsert=True
)
# b. 确保特定 group_id 的条目存在
update_result = person_info_collection.update_one(
{"person_id": user_id, f"group_nickname.{group_id_str}": {"$exists": False}},
{"$push": {"group_nickname": {group_id_str: []}}} # 如果不存在则添加
{"$push": {"group_nickname": {group_id_str: []}}}
)
if update_result.modified_count > 0:
logger.debug(f"Added group entry for group '{group_id_str}' for user '{user_id}'.")
logger.debug(f"为用户 '{user_id}' 添加了群组 '{group_id_str}' 的条目。")
# c. 确保特定 nickname 存在于 group_id 的数组中,并增加计数
update_result = person_info_collection.update_one(
@ -136,7 +72,7 @@ async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
"person_id": user_id,
"group_nickname": {
"$elemMatch": {
group_id_str: {"$elemMatch": {nickname: {"$exists": True}}}
group_id_str: {"$elemMatch": {nickname: {"$exists": True}}}
}
}
},
@ -155,98 +91,93 @@ async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
array_filters=[{f"group.{group_id_str}": {"$exists": True}}]
)
if add_nick_result.modified_count > 0:
logger.debug(f"Added nickname '{nickname}' with count 1 for user '{user_id}' in group '{group_id_str}'.")
logger.debug(f"为用户 '{user_id}' 在群组 '{group_id_str}' 中添加了绰号 '{nickname}',计数为 1。")
else:
logger.warning(f"Failed to add nickname '{nickname}' for user '{user_id}' in group '{group_id_str}'. Update result: {add_nick_result.raw_result}")
logger.warning(f"未能为用户 '{user_id}' 在群组 '{group_id_str}' 中添加绰号 '{nickname}'。更新结果: {add_nick_result.raw_result}")
elif update_result.modified_count > 0:
logger.debug(f"Incremented count for nickname '{nickname}' for user '{user_id}' in group '{group_id_str}'.")
logger.debug(f"用户 '{user_id}' 在群组 '{group_id_str}' 中的绰号 '{nickname}' 计数已增加。")
else:
logger.warning(f"Nickname increment operation matched but did not modify for user '{user_id}', nickname '{nickname}'. Update result: {update_result.raw_result}")
logger.warning(f"绰号增加操作匹配但未修改用户 '{user_id}' 的绰号 '{nickname}'。更新结果: {update_result.raw_result}")
except OperationFailure as op_err:
logger.error(f"Database operation failed for user {user_id}, group {group_id_str}, nickname {nickname}: {op_err}", exc_info=True)
logger.error(f"数据库操作失败: 用户 {user_id}, 群组 {group_id_str}, 绰号 {nickname}: {op_err}", exc_info=True)
except Exception as e:
logger.error(f"Unexpected error updating nickname for user {user_id}, group {group_id_str}, nickname {nickname}: {e}", exc_info=True)
logger.error(f"更新用户 {user_id} 的绰号 {nickname} 时发生意外错误: {e}", exc_info=True)
# --- 队列和进程 ---
# 使用明确导入的类型
nickname_queue: mpQueue[Tuple[str, str, str, Dict[str, str]]] = mpQueue(maxsize=NICKNAME_QUEUE_MAX_SIZE)
# --- [修改] 使用全局配置 ---
nickname_queue: mpQueue = mpQueue(maxsize=global_config.NICKNAME_QUEUE_MAX_SIZE)
# --- 结束修改 ---
_nickname_process: Optional[Process] = None
async def add_to_nickname_queue(
chat_history_str: str,
bot_reply: str,
group_id: Optional[str], # 群聊时需要
user_name_map: Dict[str, str] # 用户ID到名字的映射
group_id: Optional[str],
user_name_map: Dict[str, str]
):
"""将需要分析的数据放入队列。"""
if not ENABLE_NICKNAME_MAPPING:
# --- [修改] 使用全局配置 ---
if not global_config.ENABLE_NICKNAME_MAPPING:
# --- 结束修改 ---
return
if group_id is None:
logger.debug("Skipping nickname mapping for private chat.")
return # 私聊暂时不处理绰号映射
logger.debug("私聊跳过绰号映射。")
return
try:
item = (chat_history_str, bot_reply, str(group_id), user_name_map) # 确保 group_id 是字符串
# 使用 put_nowait如果队列满则会抛出 Full 异常
item = (chat_history_str, bot_reply, str(group_id), user_name_map)
nickname_queue.put_nowait(item)
logger.debug(f"Added item to nickname queue for group {group_id}.")
# 捕获 queue.Full 异常
logger.debug(f"已将项目添加到群组 {group_id} 的绰号队列。")
except Exception as e:
# 检查异常类型是否为队列满(需要导入 queue 模块或处理 Full 异常)
# from queue import Full # 如果 nickname_queue 是 asyncio.Queue
# if isinstance(e, Full):
# logger.warning("Nickname processing queue is full. Discarding new item.")
# else:
# logger.error(f"Error adding item to nickname queue: {e}", exc_info=True)
# 由于 multiprocessing.Queue 的 Full 异常在不同环境下可能不同,这里暂时捕获通用异常
logger.warning(f"Failed to add item to nickname queue (possibly full): {e}", exc_info=True)
logger.warning(f"无法将项目添加到绰号队列(可能已满): {e}", exc_info=True)
# 使用从 synchronize 导入的 mpEvent
async def _nickname_processing_loop(queue: mpQueue, stop_event: mpEvent): # 使用 mpEvent
async def _nickname_processing_loop(queue: mpQueue, stop_event: mpEvent):
"""独立进程中的主循环,处理队列任务。"""
_initialize_db() # 初始化数据库连接
logger.info("Nickname processing loop started.")
# --- [移除] 不再需要本地数据库初始化 ---
# _initialize_db()
# --- 结束移除 ---
logger.info("绰号处理循环已启动。")
while not stop_event.is_set():
try:
if not queue.empty():
# 从队列获取任务
chat_history_str, bot_reply, group_id, user_name_map = queue.get()
logger.debug(f"Processing nickname mapping task for group {group_id}...")
item = queue.get()
if isinstance(item, tuple) and len(item) == 4:
chat_history_str, bot_reply, group_id, user_name_map = item
logger.debug(f"正在处理群组 {group_id} 的绰号映射任务...")
# 调用 LLM 分析
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
# 如果找到映射,更新数据库
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(group_id, analysis_result["data"])
# 短暂 sleep 避免 CPU 占用过高
await asyncio.sleep(0.05) # 稍微减少 sleep 时间
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(group_id, analysis_result["data"])
else:
logger.warning(f"从队列接收到意外的项目类型: {type(item)}")
await asyncio.sleep(0.05)
else:
# 队列为空时休眠
await asyncio.sleep(NICKNAME_PROCESS_SLEEP_INTERVAL)
# --- [修改] 使用全局配置 ---
await asyncio.sleep(global_config.NICKNAME_PROCESS_SLEEP_INTERVAL)
# --- 结束修改 ---
except asyncio.CancelledError:
logger.info("Nickname processing loop cancelled.")
break # 响应取消请求
logger.info("绰号处理循环已取消。")
break
except Exception as e:
logger.error(f"Error in nickname processing loop: {e}\n{traceback.format_exc()}")
# 发生错误时也休眠一下,防止快速连续出错
logger.error(f"绰号处理循环出错: {e}\n{traceback.format_exc()}")
await asyncio.sleep(5)
_close_db() # 关闭数据库连接
logger.info("Nickname processing loop finished.")
# --- [移除] 不再需要本地数据库关闭 ---
# _close_db()
# --- 结束移除 ---
logger.info("绰号处理循环已结束。")
# 使用从 synchronize 导入的 mpEvent
def _run_processor_process(queue: mpQueue, stop_event: mpEvent): # 使用 mpEvent
def _run_processor_process(queue: mpQueue, stop_event: mpEvent):
"""进程启动函数,运行异步循环。"""
try:
loop = asyncio.new_event_loop()
@ -254,48 +185,72 @@ def _run_processor_process(queue: mpQueue, stop_event: mpEvent): # 使用 mpEven
loop.run_until_complete(_nickname_processing_loop(queue, stop_event))
loop.close()
except Exception as e:
logger.error(f"Error running nickname processor process: {e}", exc_info=True)
logger.error(f"运行绰号处理器进程时出错: {e}", exc_info=True)
def start_nickname_processor():
"""启动绰号映射处理进程。"""
global _nickname_process
if not ENABLE_NICKNAME_MAPPING:
logger.info("Nickname mapping feature is disabled. Processor not started.")
# --- [修改] 使用全局配置 ---
if not global_config.ENABLE_NICKNAME_MAPPING:
# --- 结束修改 ---
logger.info("绰号映射功能已禁用。处理器未启动。")
return
if _nickname_process is None or not _nickname_process.is_alive():
logger.info("Starting nickname processor process...")
logger.info("正在启动绰号处理器进程...")
# --- [修改] 从全局配置导入停止事件控制函数 ---
try:
from src.config.config import get_stop_event, set_stop_event # 再次确认导入路径
except ImportError:
logger.error("无法从 src.config.config 导入 get_stop_event/set_stop_event")
# 提供备选方案或退出
return # 或者 raise ImportError
stop_event = get_stop_event()
# --- 结束修改 ---
stop_event.clear()
# 传递明确导入的类型
_nickname_process = Process(target=_run_processor_process, args=(nickname_queue, stop_event), daemon=True)
_nickname_process.start()
logger.info(f"Nickname processor process started with PID: {_nickname_process.pid}")
logger.info(f"绰号处理器进程已启动,PID: {_nickname_process.pid}")
else:
logger.warning("Nickname processor process is already running.")
logger.warning("绰号处理器进程已在运行中。")
def stop_nickname_processor():
"""停止绰号映射处理进程。"""
global _nickname_process
if _nickname_process and _nickname_process.is_alive():
logger.info("Stopping nickname processor process...")
set_stop_event()
logger.info("正在停止绰号处理器进程...")
# --- [修改] 从全局配置导入停止事件控制函数 ---
try:
from src.config.config import set_stop_event # 再次确认导入路径
except ImportError:
logger.error("无法从 src.config.config 导入 set_stop_event")
return # 或者 raise ImportError
set_stop_event() # 发送停止信号
# --- 结束修改 ---
try:
_nickname_process.join(timeout=10)
if _nickname_process.is_alive():
logger.warning("Nickname processor process did not stop gracefully after 10 seconds. Terminating...")
logger.warning("绰号处理器进程在 10 秒后未优雅停止。正在终止...")
_nickname_process.terminate()
_nickname_process.join(timeout=5)
except Exception as e:
logger.error(f"Error stopping nickname processor process: {e}", exc_info=True)
logger.error(f"停止绰号处理器进程时出错: {e}", exc_info=True)
finally:
if _nickname_process and not _nickname_process.is_alive():
logger.info("Nickname processor process stopped successfully.")
logger.info("绰号处理器进程已成功停止。")
else:
logger.error("Failed to stop nickname processor process.")
logger.error("未能停止绰号处理器进程。")
_nickname_process = None
else:
logger.info("Nickname processor process is not running.")
logger.info("绰号处理器进程未在运行。")
# 可以在应用启动时调用 start_nickname_processor()
# 在应用关闭时调用 stop_nickname_processor()
# 可以在应用关闭时调用 stop_nickname_processor()
def get_stop_event():
"""获取全局停止事件"""
return _stop_event
def set_stop_event():
"""设置全局停止事件,通知子进程退出"""
_stop_event.set()

View File

@ -1,8 +1,8 @@
# GroupNickname/nickname_utils.py
import random
from typing import List, Dict, Tuple, Optional
from typing import List, Dict, Tuple
from src.common.logger_manager import get_logger
from .config import MAX_NICKNAMES_IN_PROMPT, NICKNAME_PROBABILITY_SMOOTHING
from src.config.config import global_config
logger = get_logger("nickname_utils")
@ -14,7 +14,7 @@ def select_nicknames_for_prompt(
Args:
all_nicknames_info: 包含用户及其绰号信息的字典格式为
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
Returns:
List[Tuple[str, str, int]]: 选中的绰号列表每个元素为 (用户名, 绰号, 次数)
@ -32,11 +32,11 @@ def select_nicknames_for_prompt(
nickname, count = list(nickname_entry.items())[0]
# 确保次数是正整数
if isinstance(count, int) and count > 0:
# 添加平滑因子避免概率为0并让低频词也有机会
weight = count + NICKNAME_PROBABILITY_SMOOTHING
# 添加平滑因子避免概率为0并让低频词也有机会
weight = count + global_config.NICKNAME_PROBABILITY_SMOOTHING
candidates.append((user_name, nickname, count, weight))
else:
logger.warning(f"Invalid count for nickname '{nickname}' of user '{user_name}': {count}. Skipping.")
logger.warning(f"Invalid count for nickname '{nickname}' of user '{user_name}': {count}. Skipping.")
else:
logger.warning(f"Invalid nickname entry format for user '{user_name}': {nickname_entry}. Skipping.")
@ -50,13 +50,13 @@ def select_nicknames_for_prompt(
if total_weight <= 0:
# 如果所有权重都无效或为0则随机选择或按次数选择
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
selected = candidates[:MAX_NICKNAMES_IN_PROMPT]
selected = candidates[:global_config.MAX_NICKNAMES_IN_PROMPT]
else:
# 计算归一化概率
probabilities = [c[3] / total_weight for c in candidates]
# 使用概率分布进行加权随机选择(不重复)
num_to_select = min(MAX_NICKNAMES_IN_PROMPT, len(candidates))
num_to_select = min(global_config.MAX_NICKNAMES_IN_PROMPT, len(candidates))
try:
# random.choices 允许重复,我们需要不重复的选择
# 可以使用 numpy.random.choice 或手动实现不重复加权抽样
@ -67,25 +67,25 @@ def select_nicknames_for_prompt(
max_attempts = num_to_select * 5 # 防止无限循环
while len(selected) < num_to_select and attempts < max_attempts:
# 每次只选一个,避免一次选多个时概率分布变化导致的问题
chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0]
if chosen_index not in selected_indices:
selected_indices.add(chosen_index)
selected.append(candidates[chosen_index])
attempts += 1
# 每次只选一个,避免一次选多个时概率分布变化导致的问题
chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0]
if chosen_index not in selected_indices:
selected_indices.add(chosen_index)
selected.append(candidates[chosen_index])
attempts += 1
# 如果尝试多次后仍未选够,补充出现次数最多的
if len(selected) < num_to_select:
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
needed = num_to_select - len(selected)
selected.extend(remaining_candidates[:needed])
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
needed = num_to_select - len(selected)
selected.extend(remaining_candidates[:needed])
except Exception as e:
logger.error(f"Error during weighted random choice for nicknames: {e}. Falling back to top N.", exc_info=True)
# 出错时回退到选择次数最多的 N 个
candidates.sort(key=lambda x: x[2], reverse=True)
selected = candidates[:MAX_NICKNAMES_IN_PROMPT]
logger.error(f"Error during weighted random choice for nicknames: {e}. Falling back to top N.", exc_info=True)
# 出错时回退到选择次数最多的 N 个
candidates.sort(key=lambda x: x[2], reverse=True)
selected = candidates[:global_config.MAX_NICKNAMES_IN_PROMPT]
# 格式化输出并按次数排序

View File

@ -29,11 +29,7 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.moods.moods import MoodManager
from src.individuality.individuality import Individuality
from src.plugins.person_info.relationship_manager import relationship_manager
# --- 导入 GroupNickname 相关 ---
from src.plugins.group_nickname.config import ENABLE_NICKNAME_MAPPING # <--- 导入开关
from src.plugins.group_nickname.nickname_processor import add_to_nickname_queue # <--- 导入队列添加函数
# --- 结束导入 GroupNickname ---
INITIAL_DURATION = 60.0
@ -708,7 +704,7 @@ class HeartFChatting:
anchor_message: 锚点消息对象
reply: Bot 生成的回复内容列表
"""
if not ENABLE_NICKNAME_MAPPING:
if not global_config.ENABLE_NICKNAME_MAPPING:
return # 如果功能未开启,则直接返回
if not anchor_message or not anchor_message.chat_stream or not anchor_message.chat_stream.group_info:

View File

@ -14,7 +14,6 @@ from ..moods.moods import MoodManager
from ..memory_system.Hippocampus import HippocampusManager
from ..schedule.schedule_generator import bot_schedule
from ..knowledge.knowledge_lib import qa_manager
from src.plugins.group_nickname.config import ENABLE_NICKNAME_MAPPING
from src.plugins.group_nickname.nickname_utils import select_nicknames_for_prompt, format_nickname_prompt_injection
from src.plugins.person_info.relationship_manager import relationship_manager
@ -227,7 +226,7 @@ class PromptBuilder:
# --- [修改] 注入绰号信息 ---
nickname_injection_str = ""
if ENABLE_NICKNAME_MAPPING and chat_stream.group_info:
if global_config.ENABLE_NICKNAME_MAPPING and chat_stream.group_info:
try:
group_id = str(chat_stream.group_info.group_id)
# 提取上下文中的用户 ID (需要 message_list_before_now 变量在此可用)

View File

@ -125,6 +125,13 @@ steal_emoji = true # 是否偷取表情包,让麦麦可以发送她保存的
enable_check = false # 是否启用表情包过滤,只有符合该要求的表情包才会被保存
check_prompt = "符合公序良俗" # 表情包过滤要求,只有符合该要求的表情包才会被保存
[group_nickname]
enable_nickname_mapping = false # 绰号映射功能总开关(默认关闭,建议关闭)
max_nicknames_in_prompt = 10 # Prompt 中最多注入的绰号数量防止token数量爆炸
nickname_probability_smoothing = 1 # 绰号加权随机选择的平滑因子
nickname_queue_max_size = 100 # 绰号处理队列最大容量
nickname_process_sleep_interval = 0.5 # 绰号处理进程休眠间隔(秒)
[memory]
build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多
build_memory_distribution = [6.0,3.0,0.6,32.0,12.0,0.4] # 记忆构建分布参数分布1均值标准差权重分布2均值标准差权重
@ -302,6 +309,13 @@ provider = "SILICONFLOW"
pri_in = 2
pri_out = 8
#绰号映射生成模型
[model.llm_nickname_mapping]
name = "deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
temp = 0.3
pri_in = 2
pri_out = 8
#此模型暂时没有使用!!
#此模型暂时没有使用!!