暂时提交一下,先去品鉴pfc知识库

pull/914/head
Bakadax 2025-04-30 14:48:02 +08:00
parent 4802c1584b
commit 3f5f41a58b
6 changed files with 976 additions and 191 deletions

View File

@ -0,0 +1,40 @@
# GroupNickname/config.py
import threading
# 功能总开关
ENABLE_NICKNAME_MAPPING = True # 设置为 False 可完全禁用此功能
# --- LLM 相关配置 (示例,你需要根据实际情况修改) ---
# 用于绰号映射分析的 LLM 模型配置
LLM_MODEL_NICKNAME_MAPPING = {
"model_name": "your_llm_model_for_mapping", # 替换成你用于分析的模型名称
"api_key": "YOUR_API_KEY", # 如果需要
"base_url": "YOUR_API_BASE", # 如果需要
"temperature": 0.5,
"max_tokens": 200,
}
# --- 数据库相关配置 (如果需要独立配置) ---
# 例如,如果数据库连接信息不同或需要特定集合名称
DB_COLLECTION_PERSON_INFO = "person_info" # 你的用户信息集合名称
# --- Prompt 注入配置 ---
MAX_NICKNAMES_IN_PROMPT = 10 # Prompt 中最多注入的绰号数量
NICKNAME_PROBABILITY_SMOOTHING = 1 # 用于加权随机选择的平滑因子 (防止概率为0)
# --- 进程控制 ---
NICKNAME_QUEUE_MAX_SIZE = 100 # 进程间通信队列的最大容量
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5 # 映射进程在队列为空时的休眠时间(秒)
# --- 运行时状态 (用于安全停止进程) ---
_stop_event = threading.Event()
def get_stop_event():
"""获取全局停止事件"""
return _stop_event
def set_stop_event():
"""设置全局停止事件,通知子进程退出"""
_stop_event.set()

View File

@ -0,0 +1,163 @@
# GroupNickname/nickname_mapper.py
import json
from typing import Dict, Any, Tuple, List, Optional
from src.common.logger_manager import get_logger # 假设你的日志管理器路径
from src.plugins.models.utils_model import LLMRequest # 假设你的 LLM 请求工具路径
from .config import LLM_MODEL_NICKNAME_MAPPING, ENABLE_NICKNAME_MAPPING
logger = get_logger("nickname_mapper")
# 初始化用于绰号映射的 LLM 实例
# 注意:这里的初始化方式可能需要根据你的 LLMRequest 实现进行调整
try:
# 尝试使用字典解包来传递参数
llm_mapper = LLMRequest(
model=LLM_MODEL_NICKNAME_MAPPING.get("model_name", "default_model"),
temperature=LLM_MODEL_NICKNAME_MAPPING.get("temperature", 0.5),
max_tokens=LLM_MODEL_NICKNAME_MAPPING.get("max_tokens", 200),
api_key=LLM_MODEL_NICKNAME_MAPPING.get("api_key"),
base_url=LLM_MODEL_NICKNAME_MAPPING.get("base_url"),
request_type="nickname_mapping" # 定义一个请求类型用于区分
)
logger.info("Nickname mapping LLM initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize nickname mapping LLM: {e}", exc_info=True)
llm_mapper = None # 初始化失败则置为 None
def _build_mapping_prompt(chat_history_str: str, bot_reply: str, user_name_map: Dict[str, str]) -> str:
"""
构建用于 LLM 分析绰号映射的 Prompt
Args:
chat_history_str: 格式化后的聊天记录字符串
bot_reply: Bot 的回复内容
user_name_map: 用户 ID 到已知名称 person_name nickname的映射
Returns:
str: 构建好的 Prompt
"""
user_list_str = "\n".join([f"- {uid}: {name}" for uid, name in user_name_map.items()])
prompt = f"""
任务分析以下聊天记录和 Bot 的最新回复判断其中是否包含用户绰号并确定绰号与用户 ID 之间是否存在明确的一一对应关系
已知用户信息
{user_list_str}
聊天记录
---
{chat_history_str}
---
Bot 最新回复
{bot_reply}
分析要求
1. 识别聊天记录和 Bot 回复中出现的可能是用户绰号的词语
2. 判断这些绰号是否能明确地指向某个特定的用户 ID一个绰号必须在上下文中清晰地与某个发言人或被提及的人关联起来
3. 如果能建立可靠的一一映射关系请输出一个 JSON 对象格式如下
{{
"is_exist": true,
"data": {{
"用户ID_A": "绰号_A",
"用户ID_B": "绰号_B"
}}
}}
其中 "data" 字段的键是用户的 ID值是对应的绰号只包含你能确认映射关系的绰号
4. 如果无法建立任何可靠的一一映射关系例如绰号指代不明没有出现绰号或无法确认绰号与用户的关联请输出 JSON 对象
{{
"is_exist": false
}}
5. 请严格按照 JSON 格式输出不要包含任何额外的解释或文本
输出
"""
return prompt
async def analyze_chat_for_nicknames(
chat_history_str: str,
bot_reply: str,
user_name_map: Dict[str, str]
) -> Dict[str, Any]:
"""
调用 LLM 分析聊天记录和 Bot 回复提取可靠的 用户ID-绰号 映射
Args:
chat_history_str: 格式化后的聊天记录字符串
bot_reply: Bot 的回复内容
user_name_map: 用户 ID 到已知名称 person_name nickname的映射
Returns:
Dict[str, Any]: 分析结果格式为 { "is_exist": bool, "data": Optional[Dict[str, str]] }
如果出错返回 {"is_exist": False}
"""
if not ENABLE_NICKNAME_MAPPING:
logger.debug("Nickname mapping feature is disabled.")
return {"is_exist": False}
if llm_mapper is None:
logger.error("Nickname mapping LLM is not initialized. Cannot perform analysis.")
return {"is_exist": False}
prompt = _build_mapping_prompt(chat_history_str, bot_reply, user_name_map)
logger.debug(f"Nickname mapping prompt built:\n{prompt}") # 调试日志
try:
# --- 调用 LLM ---
# 注意:这里的调用方式需要根据你的 LLMRequest 实现进行调整
# 可能需要使用 generate_response_sync 或其他同步方法,因为这将在独立进程中运行
# 或者如果 LLMRequest 支持异步,确保在异步环境中调用
# response_content, _, _ = await llm_mapper.generate_response(prompt)
# 假设 llm_mapper 有一个同步的 generate 方法或在异步环境中调用
# 这里暂时使用 await如果你的 LLMRequest 不支持,需要修改
response_content, _, _ = await llm_mapper.generate_response(prompt)
logger.debug(f"LLM raw response for nickname mapping: {response_content}")
# --- 解析 LLM 响应 ---
if not response_content:
logger.warning("LLM returned empty content for nickname mapping.")
return {"is_exist": False}
# 尝试去除可能的代码块标记
response_content = response_content.strip()
if response_content.startswith("```json"):
response_content = response_content[7:]
if response_content.endswith("```"):
response_content = response_content[:-3]
response_content = response_content.strip()
try:
result = json.loads(response_content)
# 基本验证
if isinstance(result, dict) and "is_exist" in result:
if result["is_exist"] is True:
if "data" in result and isinstance(result["data"], dict):
# 过滤掉 data 为空的情况
if not result["data"]:
logger.debug("LLM indicated is_exist=True but data is empty. Treating as False.")
return {"is_exist": False}
logger.info(f"Nickname mapping found: {result['data']}")
return {"is_exist": True, "data": result["data"]}
else:
logger.warning("LLM response format error: is_exist is True but 'data' is missing or not a dict.")
return {"is_exist": False}
elif result["is_exist"] is False:
logger.info("No reliable nickname mapping found by LLM.")
return {"is_exist": False}
else:
logger.warning("LLM response format error: 'is_exist' is not a boolean.")
return {"is_exist": False}
else:
logger.warning("LLM response format error: Missing 'is_exist' key or not a dict.")
return {"is_exist": False}
except json.JSONDecodeError as json_err:
logger.error(f"Failed to parse LLM response as JSON: {json_err}\nRaw response: {response_content}")
return {"is_exist": False}
except Exception as e:
logger.error(f"Error during nickname mapping LLM call or processing: {e}", exc_info=True)
return {"is_exist": False}

View File

@ -0,0 +1,301 @@
# GroupNickname/nickname_processor.py
import asyncio
import time
import traceback
# 明确导入 Event 和 Queue
from multiprocessing import Process, Queue as mpQueue
# 尝试从 synchronize 导入 Event
from multiprocessing.synchronize import Event as mpEvent
from typing import Dict, Any, Tuple, Optional, List
from pymongo import MongoClient, UpdateOne
from pymongo.errors import ConnectionFailure, OperationFailure
# 假设你的项目结构允许这样导入
try:
from src.common.logger_manager import get_logger
from src.config.config import global_config
from .config import (
ENABLE_NICKNAME_MAPPING, DB_COLLECTION_PERSON_INFO,
NICKNAME_QUEUE_MAX_SIZE, NICKNAME_PROCESS_SLEEP_INTERVAL,
get_stop_event, set_stop_event
)
from .nickname_mapper import analyze_chat_for_nicknames
except ImportError:
# 提供备选导入路径或记录错误,以便调试
print("Error: Failed to import necessary modules. Please check your project structure and PYTHONPATH.")
# 在无法导入时,定义临时的 get_logger 以避免 NameError但这只是权宜之计
import logging
def get_logger(name):
return logging.getLogger(name)
# 定义临时的全局配置,这同样是权宜之计
class MockGlobalConfig:
mongodb_uri = "mongodb://localhost:27017/" # 示例 URI
mongodb_database = "your_db_name" # 示例数据库名
global_config = MockGlobalConfig()
# 定义临时的配置变量
ENABLE_NICKNAME_MAPPING = True
DB_COLLECTION_PERSON_INFO = "person_info"
NICKNAME_QUEUE_MAX_SIZE = 100
NICKNAME_PROCESS_SLEEP_INTERVAL = 0.5
# 使用导入的 mpEvent
_stop_event_internal = mpEvent()
def get_stop_event(): return _stop_event_internal
def set_stop_event(): _stop_event_internal.set()
# 定义临时的 analyze_chat_for_nicknames
async def analyze_chat_for_nicknames(*args, **kwargs): return {"is_exist": False}
logger = get_logger("nickname_processor")
# --- 数据库连接 ---
mongo_client: Optional[MongoClient] = None
person_info_collection = None
def _initialize_db():
"""初始化数据库连接(在子进程中调用)"""
global mongo_client, person_info_collection
if mongo_client is None:
try:
mongo_uri = global_config.mongodb_uri
if not mongo_uri:
raise ValueError("MongoDB URI not found in global config.")
mongo_client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
mongo_client.admin.command('ping')
db = mongo_client[global_config.mongodb_database]
person_info_collection = db[DB_COLLECTION_PERSON_INFO]
logger.info("Nickname processor: Database connection initialized successfully.")
except (ConnectionFailure, ValueError, OperationFailure) as e:
logger.error(f"Nickname processor: Failed to initialize database connection: {e}", exc_info=True)
mongo_client = None
person_info_collection = None
except Exception as e:
logger.error(f"Nickname processor: An unexpected error occurred during DB initialization: {e}", exc_info=True)
mongo_client = None
person_info_collection = None
def _close_db():
"""关闭数据库连接"""
global mongo_client
if mongo_client:
try:
mongo_client.close()
logger.info("Nickname processor: Database connection closed.")
except Exception as e:
logger.error(f"Nickname processor: Error closing database connection: {e}", exc_info=True)
finally:
mongo_client = None
# --- 数据库更新逻辑 ---
async def update_nickname_counts(group_id: str, nickname_map: Dict[str, str]):
"""
更新数据库中用户的群组绰号计数
Args:
group_id (str): 群组 ID
nickname_map (Dict[str, str]): 需要更新的 {用户ID: 绰号} 映射
"""
if not person_info_collection:
logger.error("Database collection is not initialized. Cannot update nickname counts.")
return
if not nickname_map:
logger.debug("Empty nickname map provided for update.")
return
logger.info(f"Attempting to update nickname counts for group '{group_id}' with map: {nickname_map}")
for user_id, nickname in nickname_map.items():
if not user_id or not nickname:
logger.warning(f"Skipping invalid entry in nickname map: user_id='{user_id}', nickname='{nickname}'")
continue
group_id_str = str(group_id) # 确保是字符串
try:
# a. 确保用户文档存在 group_nickname 字段且为 list
person_info_collection.update_one(
{"person_id": user_id},
{"$setOnInsert": {"group_nickname": []}}, # 如果字段不存在则创建为空列表
upsert=True
)
# b. 确保特定 group_id 的条目存在
update_result = person_info_collection.update_one(
{"person_id": user_id, f"group_nickname.{group_id_str}": {"$exists": False}},
{"$push": {"group_nickname": {group_id_str: []}}} # 如果不存在则添加
)
if update_result.modified_count > 0:
logger.debug(f"Added group entry for group '{group_id_str}' for user '{user_id}'.")
# c. 确保特定 nickname 存在于 group_id 的数组中,并增加计数
update_result = person_info_collection.update_one(
{
"person_id": user_id,
"group_nickname": {
"$elemMatch": {
group_id_str: {"$elemMatch": {nickname: {"$exists": True}}}
}
}
},
{"$inc": {f"group_nickname.$[group].$[nick].{nickname}": 1}},
array_filters=[
{f"group.{group_id_str}": {"$exists": True}},
{f"nick.{nickname}": {"$exists": True}}
]
)
if update_result.matched_count == 0:
# nickname 不存在,添加 nickname 并设置次数为 1
add_nick_result = person_info_collection.update_one(
{"person_id": user_id, f"group_nickname.{group_id_str}": {"$exists": True}},
{"$push": {f"group_nickname.$[group].{group_id_str}": {nickname: 1}}},
array_filters=[{f"group.{group_id_str}": {"$exists": True}}]
)
if add_nick_result.modified_count > 0:
logger.debug(f"Added nickname '{nickname}' with count 1 for user '{user_id}' in group '{group_id_str}'.")
else:
logger.warning(f"Failed to add nickname '{nickname}' for user '{user_id}' in group '{group_id_str}'. Update result: {add_nick_result.raw_result}")
elif update_result.modified_count > 0:
logger.debug(f"Incremented count for nickname '{nickname}' for user '{user_id}' in group '{group_id_str}'.")
else:
logger.warning(f"Nickname increment operation matched but did not modify for user '{user_id}', nickname '{nickname}'. Update result: {update_result.raw_result}")
except OperationFailure as op_err:
logger.error(f"Database operation failed for user {user_id}, group {group_id_str}, nickname {nickname}: {op_err}", exc_info=True)
except Exception as e:
logger.error(f"Unexpected error updating nickname for user {user_id}, group {group_id_str}, nickname {nickname}: {e}", exc_info=True)
# --- 队列和进程 ---
# 使用明确导入的类型
nickname_queue: mpQueue[Tuple[str, str, str, Dict[str, str]]] = mpQueue(maxsize=NICKNAME_QUEUE_MAX_SIZE)
_nickname_process: Optional[Process] = None
async def add_to_nickname_queue(
chat_history_str: str,
bot_reply: str,
group_id: Optional[str], # 群聊时需要
user_name_map: Dict[str, str] # 用户ID到名字的映射
):
"""将需要分析的数据放入队列。"""
if not ENABLE_NICKNAME_MAPPING:
return
if group_id is None:
logger.debug("Skipping nickname mapping for private chat.")
return # 私聊暂时不处理绰号映射
try:
item = (chat_history_str, bot_reply, str(group_id), user_name_map) # 确保 group_id 是字符串
# 使用 put_nowait如果队列满则会抛出 Full 异常
nickname_queue.put_nowait(item)
logger.debug(f"Added item to nickname queue for group {group_id}.")
# 捕获 queue.Full 异常
except Exception as e:
# 检查异常类型是否为队列满(需要导入 queue 模块或处理 Full 异常)
# from queue import Full # 如果 nickname_queue 是 asyncio.Queue
# if isinstance(e, Full):
# logger.warning("Nickname processing queue is full. Discarding new item.")
# else:
# logger.error(f"Error adding item to nickname queue: {e}", exc_info=True)
# 由于 multiprocessing.Queue 的 Full 异常在不同环境下可能不同,这里暂时捕获通用异常
logger.warning(f"Failed to add item to nickname queue (possibly full): {e}", exc_info=True)
# 使用从 synchronize 导入的 mpEvent
async def _nickname_processing_loop(queue: mpQueue, stop_event: mpEvent): # 使用 mpEvent
"""独立进程中的主循环,处理队列任务。"""
_initialize_db() # 初始化数据库连接
logger.info("Nickname processing loop started.")
while not stop_event.is_set():
try:
if not queue.empty():
# 从队列获取任务
chat_history_str, bot_reply, group_id, user_name_map = queue.get()
logger.debug(f"Processing nickname mapping task for group {group_id}...")
# 调用 LLM 分析
analysis_result = await analyze_chat_for_nicknames(chat_history_str, bot_reply, user_name_map)
# 如果找到映射,更新数据库
if analysis_result.get("is_exist") and analysis_result.get("data"):
await update_nickname_counts(group_id, analysis_result["data"])
# 短暂 sleep 避免 CPU 占用过高
await asyncio.sleep(0.05) # 稍微减少 sleep 时间
else:
# 队列为空时休眠
await asyncio.sleep(NICKNAME_PROCESS_SLEEP_INTERVAL)
except asyncio.CancelledError:
logger.info("Nickname processing loop cancelled.")
break # 响应取消请求
except Exception as e:
logger.error(f"Error in nickname processing loop: {e}\n{traceback.format_exc()}")
# 发生错误时也休眠一下,防止快速连续出错
await asyncio.sleep(5)
_close_db() # 关闭数据库连接
logger.info("Nickname processing loop finished.")
# 使用从 synchronize 导入的 mpEvent
def _run_processor_process(queue: mpQueue, stop_event: mpEvent): # 使用 mpEvent
"""进程启动函数,运行异步循环。"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_nickname_processing_loop(queue, stop_event))
loop.close()
except Exception as e:
logger.error(f"Error running nickname processor process: {e}", exc_info=True)
def start_nickname_processor():
"""启动绰号映射处理进程。"""
global _nickname_process
if not ENABLE_NICKNAME_MAPPING:
logger.info("Nickname mapping feature is disabled. Processor not started.")
return
if _nickname_process is None or not _nickname_process.is_alive():
logger.info("Starting nickname processor process...")
stop_event = get_stop_event()
stop_event.clear()
# 传递明确导入的类型
_nickname_process = Process(target=_run_processor_process, args=(nickname_queue, stop_event), daemon=True)
_nickname_process.start()
logger.info(f"Nickname processor process started with PID: {_nickname_process.pid}")
else:
logger.warning("Nickname processor process is already running.")
def stop_nickname_processor():
"""停止绰号映射处理进程。"""
global _nickname_process
if _nickname_process and _nickname_process.is_alive():
logger.info("Stopping nickname processor process...")
set_stop_event()
try:
_nickname_process.join(timeout=10)
if _nickname_process.is_alive():
logger.warning("Nickname processor process did not stop gracefully after 10 seconds. Terminating...")
_nickname_process.terminate()
_nickname_process.join(timeout=5)
except Exception as e:
logger.error(f"Error stopping nickname processor process: {e}", exc_info=True)
finally:
if _nickname_process and not _nickname_process.is_alive():
logger.info("Nickname processor process stopped successfully.")
else:
logger.error("Failed to stop nickname processor process.")
_nickname_process = None
else:
logger.info("Nickname processor process is not running.")
# 可以在应用启动时调用 start_nickname_processor()
# 在应用关闭时调用 stop_nickname_processor()

View File

@ -0,0 +1,126 @@
# GroupNickname/nickname_utils.py
import random
from typing import List, Dict, Tuple, Optional
from src.common.logger_manager import get_logger
from .config import MAX_NICKNAMES_IN_PROMPT, NICKNAME_PROBABILITY_SMOOTHING
logger = get_logger("nickname_utils")
def select_nicknames_for_prompt(
all_nicknames_info: Dict[str, List[Dict[str, int]]]
) -> List[Tuple[str, str, int]]:
"""
从给定的绰号信息中根据映射次数加权随机选择最多 N 个绰号
Args:
all_nicknames_info: 包含用户及其绰号信息的字典格式为
{ "用户名1": [{"绰号A": 次数}, {"绰号B": 次数}], ... }
Returns:
List[Tuple[str, str, int]]: 选中的绰号列表每个元素为 (用户名, 绰号, 次数)
按次数降序排序
"""
if not all_nicknames_info:
return []
candidates = []
for user_name, nicknames in all_nicknames_info.items():
if nicknames:
for nickname_entry in nicknames:
# nickname_entry 应该是 {"绰号": 次数} 格式
if isinstance(nickname_entry, dict) and len(nickname_entry) == 1:
nickname, count = list(nickname_entry.items())[0]
# 确保次数是正整数
if isinstance(count, int) and count > 0:
# 添加平滑因子避免概率为0并让低频词也有机会
weight = count + NICKNAME_PROBABILITY_SMOOTHING
candidates.append((user_name, nickname, count, weight))
else:
logger.warning(f"Invalid count for nickname '{nickname}' of user '{user_name}': {count}. Skipping.")
else:
logger.warning(f"Invalid nickname entry format for user '{user_name}': {nickname_entry}. Skipping.")
if not candidates:
return []
# 计算总权重
total_weight = sum(c[3] for c in candidates)
if total_weight <= 0:
# 如果所有权重都无效或为0则随机选择或按次数选择
candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
selected = candidates[:MAX_NICKNAMES_IN_PROMPT]
else:
# 计算归一化概率
probabilities = [c[3] / total_weight for c in candidates]
# 使用概率分布进行加权随机选择(不重复)
num_to_select = min(MAX_NICKNAMES_IN_PROMPT, len(candidates))
try:
# random.choices 允许重复,我们需要不重复的选择
# 可以使用 numpy.random.choice 或手动实现不重复加权抽样
# 这里用一个简化的方法:多次 choices 然后去重,直到达到数量或无法再选
selected_indices = set()
selected = []
attempts = 0
max_attempts = num_to_select * 5 # 防止无限循环
while len(selected) < num_to_select and attempts < max_attempts:
# 每次只选一个,避免一次选多个时概率分布变化导致的问题
chosen_index = random.choices(range(len(candidates)), weights=probabilities, k=1)[0]
if chosen_index not in selected_indices:
selected_indices.add(chosen_index)
selected.append(candidates[chosen_index])
attempts += 1
# 如果尝试多次后仍未选够,补充出现次数最多的
if len(selected) < num_to_select:
remaining_candidates = [c for i, c in enumerate(candidates) if i not in selected_indices]
remaining_candidates.sort(key=lambda x: x[2], reverse=True) # 按原始次数排序
needed = num_to_select - len(selected)
selected.extend(remaining_candidates[:needed])
except Exception as e:
logger.error(f"Error during weighted random choice for nicknames: {e}. Falling back to top N.", exc_info=True)
# 出错时回退到选择次数最多的 N 个
candidates.sort(key=lambda x: x[2], reverse=True)
selected = candidates[:MAX_NICKNAMES_IN_PROMPT]
# 格式化输出并按次数排序
result = [(user, nick, count) for user, nick, count, _weight in selected]
result.sort(key=lambda x: x[2], reverse=True) # 按次数降序
logger.debug(f"Selected nicknames for prompt: {result}")
return result
def format_nickname_prompt_injection(selected_nicknames: List[Tuple[str, str, int]]) -> str:
"""
将选中的绰号信息格式化为注入 Prompt 的字符串
Args:
selected_nicknames: 选中的绰号列表 (用户名, 绰号, 次数)
Returns:
str: 格式化后的字符串如果列表为空则返回空字符串
"""
if not selected_nicknames:
return ""
prompt_lines = ["以下是聊天记录中一些成员在本群的绰号信息(按常用度排序):"]
grouped_by_user: Dict[str, List[str]] = {}
for user_name, nickname, _count in selected_nicknames:
if user_name not in grouped_by_user:
grouped_by_user[user_name] = []
# 添加引号以区分绰号
grouped_by_user[user_name].append(f'{nickname}')
for user_name, nicknames in grouped_by_user.items():
nicknames_str = "".join(nicknames)
prompt_lines.append(f"{user_name},在本群有时被称为:{nicknames_str}")
return "\n".join(prompt_lines) + "\n" # 末尾加换行符

View File

@ -21,13 +21,19 @@ from src.heart_flow.sub_mind import SubMind
from src.heart_flow.observation import Observation
from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_manager, prompt_builder
import contextlib
from src.plugins.utils.chat_message_builder import num_new_messages_since
from src.plugins.utils.chat_message_builder import num_new_messages_since, get_raw_msg_before_timestamp_with_chat, build_readable_messages
from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo
from .heartFC_sender import HeartFCSender
from src.plugins.chat.utils import process_llm_response
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from src.plugins.moods.moods import MoodManager
from src.individuality.individuality import Individuality
from src.plugins.person_info.relationship_manager import relationship_manager
# --- 导入 GroupNickname 相关 ---
from src.plugins.group_nickname.config import ENABLE_NICKNAME_MAPPING # <--- 导入开关
from src.plugins.group_nickname.nickname_processor import add_to_nickname_queue # <--- 导入队列添加函数
# --- 结束导入 GroupNickname ---
INITIAL_DURATION = 60.0
@ -469,7 +475,6 @@ class HeartFChatting:
return False, ""
# execute:执行
return await self._handle_action(
action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time
)
@ -509,11 +514,17 @@ class HeartFChatting:
try:
if action == "text_reply":
return await handler(reasoning, emoji_query, cycle_timers)
# 调用文本回复处理,它会返回 (bool, thinking_id)
success, thinking_id = await handler(reasoning, emoji_query, cycle_timers)
return success, thinking_id # 直接返回结果
elif action == "emoji_reply":
return await handler(reasoning, emoji_query), ""
# 调用表情回复处理,它只返回 bool
success = await handler(reasoning, emoji_query)
return success, "" # thinking_id 为空字符串
else: # no_reply
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
# 调用不回复处理,它只返回 bool
success = await handler(reasoning, planner_start_db_time, cycle_timers)
return success, "" # thinking_id 为空字符串
except HeartFCError as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
# 出错时也重置计数器
@ -530,6 +541,7 @@ class HeartFChatting:
2. 创建思考消息
3. 生成回复
4. 发送消息
5. [新增] 触发绰号分析
参数:
reasoning: 回复原因
@ -553,6 +565,7 @@ class HeartFChatting:
if not thinking_id:
raise PlannerError("无法创建思考消息")
reply = None # 初始化 reply
try:
# 生成回复
with Timer("生成回复", cycle_timers):
@ -566,7 +579,6 @@ class HeartFChatting:
raise ReplierError("回复生成失败")
# 发送消息
with Timer("发送消息", cycle_timers):
await self._sender(
thinking_id=thinking_id,
@ -575,6 +587,11 @@ class HeartFChatting:
send_emoji=emoji_query,
)
# --- [新增] 触发绰号分析 ---
# 在发送成功后(或至少尝试发送后)触发
await self._trigger_nickname_analysis(anchor_message, reply)
# --- 结束触发 ---
return True, thinking_id
except (ReplierError, SenderError) as e:
@ -682,6 +699,101 @@ class HeartFChatting:
# 发生意外错误时,可以选择是否重置计数器,这里选择不重置
return False # 表示动作未成功
# --- [修改] 触发绰号分析的函数 ---
async def _trigger_nickname_analysis(self, anchor_message: MessageRecv, reply: List[str]):
"""
触发绰号分析任务将相关数据放入处理队列
Args:
anchor_message: 锚点消息对象
reply: Bot 生成的回复内容列表
"""
if not ENABLE_NICKNAME_MAPPING:
return # 如果功能未开启,则直接返回
if not anchor_message or not anchor_message.chat_stream or not anchor_message.chat_stream.group_info:
logger.debug(f"{self.log_prefix} Skipping nickname analysis: Not a group chat or invalid anchor.")
return # 仅在群聊中进行分析
try:
# 1. 获取原始消息列表
history_limit = 30 # 例如,获取最近 30 条消息
history_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=anchor_message.chat_stream.stream_id,
timestamp=time.time(), # 获取当前时间点的历史
limit=history_limit
)
# --- 使用 build_readable_messages 格式化历史记录 ---
chat_history_str = await build_readable_messages(
messages=history_messages,
replace_bot_name=True, # 在分析时也替换机器人名字,使其与 LLM 交互一致
merge_messages=False, # 不合并,保留原始对话流
timestamp_mode="relative", # 使用相对时间戳
read_mark=0.0, # 不需要已读标记
truncate=False # 获取完整内容进行分析
)
# --- 结束使用 build_readable_messages ---
# 2. 获取 Bot 回复字符串
bot_reply_str = " ".join(reply)
# 3. 获取群号
group_id = str(anchor_message.chat_stream.group_info.group_id) # 确保是字符串
# 4. 获取当前上下文中涉及的用户 ID 及其已知名称
user_ids_in_history = set()
for msg in history_messages:
sender_id = msg.get('sender_id')
if sender_id:
user_ids_in_history.add(str(sender_id)) # 确保是字符串
user_name_map = {}
if user_ids_in_history:
platform = anchor_message.chat_stream.platform
# 尝试批量获取 person_name
# 假设 relationship_manager 有 get_person_names_batch(platform, user_ids)
try:
# 注意:你需要实现 get_person_names_batch 方法
# names_data = await relationship_manager.get_person_names_batch(platform, list(user_ids_in_history))
# 这里暂时用单次获取代替,如果你的 relationship_manager 没有批量方法
names_data = {}
for user_id in user_ids_in_history:
name = await relationship_manager.get_person_name(platform, user_id)
if name:
names_data[user_id] = name
except AttributeError:
logger.warning("relationship_manager does not have get_person_names_batch method. Falling back to single lookups.")
names_data = {}
for user_id in user_ids_in_history:
name = await relationship_manager.get_person_name(platform, user_id)
if name:
names_data[user_id] = name
except Exception as e:
logger.error(f"Error getting person names: {e}", exc_info=True)
names_data = {} # 出错时置空
for user_id in user_ids_in_history:
if user_id in names_data:
user_name_map[user_id] = names_data[user_id]
else:
# 回退查找 nickname
latest_nickname = next((m.get('sender_nickname') for m in reversed(history_messages) if str(m.get('sender_id')) == user_id), None)
if latest_nickname:
user_name_map[user_id] = latest_nickname
else:
user_name_map[user_id] = f"未知({user_id})"
# 5. 添加到队列
await add_to_nickname_queue(chat_history_str, bot_reply_str, group_id, user_name_map)
logger.debug(f"{self.log_prefix} Triggered nickname analysis for group {group_id}.")
except Exception as e:
logger.error(f"{self.log_prefix} Error triggering nickname analysis: {e}", exc_info=True)
# --- 结束触发函数 ---
async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool:
"""
等待新消息 检测到关闭信号

View File

@ -5,9 +5,7 @@ from bson.decimal128 import Decimal128
from .person_info import person_info_manager
import time
import random
# import re
# import traceback
from typing import List, Dict, Any, Optional, Tuple # 确保导入了 List, Dict, Optional, Tuple
logger = get_logger("relation")
@ -82,26 +80,155 @@ class RelationshipManager:
@staticmethod
async def is_qved_name(platform, user_id):
"""判断是否认识某人"""
"""判断是否已经命名"""
person_id = person_info_manager.get_person_id(platform, user_id)
is_qved = await person_info_manager.has_one_field(person_id, "person_name")
old_name = await person_info_manager.get_value(person_id, "person_name")
# print(f"old_name: {old_name}")
# print(f"is_qved: {is_qved}")
if is_qved and old_name is not None:
return True
else:
return False
# 优化:直接检查 person_name 字段是否存在且不为 None 或空字符串
person_name = await person_info_manager.get_value(person_id, "person_name")
return bool(person_name) # 如果 person_name 非空则返回 True
@staticmethod
async def get_person_name(platform: str, user_id: str) -> Optional[str]:
"""获取单个用户的 person_name"""
person_id = person_info_manager.get_person_id(platform, str(user_id)) # 确保 user_id 是字符串
return await person_info_manager.get_value(person_id, "person_name")
# --- [新增] 批量获取用户名称 ---
@staticmethod
async def get_person_names_batch(platform: str, user_ids: List[str]) -> Dict[str, str]:
"""
批量获取多个用户的 person_name
Args:
platform (str): 平台名称
user_ids (List[str]): 用户 ID 列表
Returns:
Dict[str, str]: 映射 {user_id: person_name}只包含成功获取到名称的用户
"""
if not user_ids:
return {}
person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids] # 确保 uid 是字符串
names_map = {}
try:
# 使用 $in 操作符批量查询
cursor = person_info_manager.collection.find(
{"person_id": {"$in": person_ids}},
{"_id": 0, "person_id": 1, "person_name": 1} # 只查询需要的字段
)
async for doc in cursor:
# 从 person_id 反向推导出原始 user_id
# 注意:这依赖于 get_person_id 的实现方式,假设它是 platform_userid 格式
original_user_id = doc.get("person_id", "").split("_", 1)[-1]
person_name = doc.get("person_name")
if original_user_id and person_name:
names_map[original_user_id] = person_name
logger.debug(f"Batch get person names for {len(user_ids)} users, found {len(names_map)} names.")
except Exception as e:
logger.error(f"Error during batch get person names: {e}", exc_info=True)
return names_map
# --- 结束新增 ---
# --- [新增] 批量获取用户群组绰号 ---
@staticmethod
async def get_users_group_nicknames(platform: str, user_ids: List[str], group_id: str) -> Dict[str, List[Dict[str, int]]]:
"""
批量获取多个用户在指定群组的绰号信息
Args:
platform (str): 平台名称
user_ids (List[str]): 用户 ID 列表
group_id (str): 群组 ID
Returns:
Dict[str, List[Dict[str, int]]]: 映射 {person_name: [{"绰号A": 次数}, ...]}
只包含成功获取到绰号信息的用户
键是用户的 person_name
"""
if not user_ids or not group_id:
return {}
person_ids = [person_info_manager.get_person_id(platform, str(uid)) for uid in user_ids]
nicknames_data = {}
group_id_str = str(group_id) # 确保 group_id 是字符串
try:
# 查询包含目标 person_id 且 group_nickname 字段存在的文档
cursor = person_info_manager.collection.find(
{
"person_id": {"$in": person_ids},
"group_nickname": {"$elemMatch": {group_id_str: {"$exists": True}}} # 确保该群组的条目存在
},
{"_id": 0, "person_id": 1, "person_name": 1, "group_nickname": 1} # 查询所需字段
)
async for doc in cursor:
person_name = doc.get("person_name")
if not person_name: # 如果没有 person_name则跳过此用户
continue
group_nicknames_list = doc.get("group_nickname", [])
user_group_nicknames = []
# 遍历 group_nickname 列表,找到对应 group_id 的条目
for group_entry in group_nicknames_list:
if group_id_str in group_entry and isinstance(group_entry[group_id_str], list):
# 提取该群组的绰号列表 [{"绰号": 次数}, ...]
user_group_nicknames = group_entry[group_id_str]
break # 找到后即可退出内层循环
if user_group_nicknames: # 确保列表非空
# 过滤掉格式不正确的条目
valid_nicknames = []
for item in user_group_nicknames:
if isinstance(item, dict) and len(item) == 1:
key, value = list(item.items())[0]
if isinstance(key, str) and isinstance(value, int):
valid_nicknames.append(item)
else:
logger.warning(f"Invalid nickname format in DB for user {person_name}, group {group_id_str}: {item}")
else:
logger.warning(f"Invalid nickname entry format in DB for user {person_name}, group {group_id_str}: {item}")
if valid_nicknames:
nicknames_data[person_name] = valid_nicknames # 使用 person_name 作为 key
logger.debug(f"Batch get group nicknames for {len(user_ids)} users in group {group_id_str}, found data for {len(nicknames_data)} users.")
except Exception as e:
logger.error(f"Error during batch get group nicknames: {e}", exc_info=True)
return nicknames_data
# --- 结束新增 ---
@staticmethod
async def first_knowing_some_one(platform, user_id, user_nickname, user_cardname, user_avatar):
"""判断是否认识某人"""
person_id = person_info_manager.get_person_id(platform, user_id)
await person_info_manager.update_one_field(person_id, "nickname", user_nickname)
# await person_info_manager.update_one_field(person_id, "user_cardname", user_cardname)
# await person_info_manager.update_one_field(person_id, "user_avatar", user_avatar)
# 首次认识时,除了更新 nickname也应该设置初始关系值等
initial_data = {
"platform": platform,
"user_id": user_id,
"nickname": user_nickname,
"konw_time": int(time.time()),
"relationship_value": 0.0, # 设置初始关系值为 0
"msg_interval": -1, # 初始消息间隔设为 -1 或其他标记
"msg_interval_list": [],
"group_nickname": [] # 初始化为空列表
}
# 使用 update_one 并结合 $setOnInsert 来避免覆盖已有数据
await person_info_manager.collection.update_one(
{"person_id": person_id},
{
"$set": {"nickname": user_nickname}, # 总是更新 nickname
"$setOnInsert": initial_data # 仅在插入新文档时设置这些初始值
},
upsert=True
)
# 尝试获取或生成 person_name
await person_info_manager.qv_person_name(person_id, user_nickname, user_cardname, user_avatar)
async def calculate_update_relationship_value(self, chat_stream: ChatStream, label: str, stance: str) -> tuple:
"""计算并变更关系值
新的关系值变更计算方式
@ -135,216 +262,132 @@ class RelationshipManager:
}
person_id = person_info_manager.get_person_id(chat_stream.user_info.platform, chat_stream.user_info.user_id)
data = {
data = { # 这个 data 似乎是用于 setOnInsert 的,应该在 first_knowing 时处理
"platform": chat_stream.user_info.platform,
"user_id": chat_stream.user_info.user_id,
"nickname": chat_stream.user_info.user_nickname,
"konw_time": int(time.time()),
}
old_value = await person_info_manager.get_value(person_id, "relationship_value")
old_value = self.ensure_float(old_value, person_id)
old_value = self.ensure_float(old_value, person_id) # 确保是 float
if old_value > 1000:
old_value = 1000
elif old_value < -1000:
old_value = -1000
# 限制旧值范围
old_value = max(min(old_value, 1000), -1000)
value = valuedict[label]
if old_value >= 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.cos(math.pi * old_value / 2000)
if old_value > 500:
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700)
high_value_count = len(rdict)
if old_value > 700:
value *= 3 / (high_value_count + 2) # 排除自己
else:
value *= 3 / (high_value_count + 3)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.exp(old_value / 2000)
else:
value = 0
elif old_value < 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.exp(old_value / 2000)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.cos(math.pi * old_value / 2000)
else:
value = 0
value_change = 0.0 # 初始化变化量
base_value = valuedict.get(label, 0.0) # 获取基础情绪值
# 应用立场影响和关系值衰减/增强逻辑
if base_value > 0 and stancedict.get(stance, 1) != 2: # 正面情绪且非反对
value_change = base_value * math.cos(math.pi * old_value / 2000)
if old_value > 500: # 高关系值增长减缓
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700)
high_value_count = len(rdict)
# 注意:这里的减缓因子可能需要调整
value_change *= 3 / (high_value_count + (2 if old_value > 700 else 3))
elif base_value < 0 and stancedict.get(stance, 1) != 0: # 负面情绪且非支持
# 关系好时负面影响更大,关系差时负面影响减弱
value_change = base_value * math.exp(old_value / 2000) if old_value >= 0 else base_value * math.cos(math.pi * old_value / 2000)
# else: 立场冲突或情绪平静,基础变化为 0
# 应用正反馈系统和情绪反馈
self.positive_feedback_sys(label, stance)
value = self.mood_feedback(value)
value_change = self.mood_feedback(value_change) # 应用当前情绪对关系变化的影响
value_change = self.feedback_to_mood(value_change) # 应用连续反馈对关系变化的影响
level_num = self.calculate_level_num(old_value + value)
new_value = old_value + value_change
# 再次限制新值范围
new_value = max(min(new_value, 1000), -1000)
actual_change = new_value - old_value # 记录实际变化量
level_num = self.calculate_level_num(new_value)
relationship_level = ["厌恶", "冷漠", "一般", "友好", "喜欢", "暧昧"]
logger.info(
f"用户: {chat_stream.user_info.user_nickname}"
f"用户: {chat_stream.user_info.user_nickname} "
f"当前关系: {relationship_level[level_num]}, "
f"关系值: {old_value:.2f}, "
f"当前立场情感: {stance}-{label}, "
f"变更: {value:+.5f}"
f"立场情感: {stance}-{label}, "
f"变更: {actual_change:+.5f}, "
f"新值: {new_value:.2f}"
)
await person_info_manager.update_one_field(person_id, "relationship_value", old_value + value, data)
# 更新数据库,只更新 relationship_value
await person_info_manager.update_one_field(person_id, "relationship_value", new_value)
return chat_stream.user_info.user_nickname, value, relationship_level[level_num]
return chat_stream.user_info.user_nickname, actual_change, relationship_level[level_num]
async def calculate_update_relationship_value_with_reason(
self, chat_stream: ChatStream, label: str, stance: str, reason: str
) -> tuple:
"""计算并变更关系值
新的关系值变更计算方式
将关系值限定在-1000到1000
对于关系值的变更期望
1.向两端逼近时会逐渐减缓
2.关系越差改善越难关系越好恶化越容易
3.人维护关系的精力往往有限所以当高关系值用户越多对于中高关系值用户增长越慢
4.连续正面或负面情感会正反馈
返回
用户昵称变更值变更后关系等级
"""
stancedict = {
"支持": 0,
"中立": 1,
"反对": 2,
}
valuedict = {
"开心": 1.5,
"愤怒": -2.0,
"悲伤": -0.5,
"惊讶": 0.6,
"害羞": 2.0,
"平静": 0.3,
"恐惧": -1.5,
"厌恶": -1.0,
"困惑": 0.5,
}
person_id = person_info_manager.get_person_id(chat_stream.user_info.platform, chat_stream.user_info.user_id)
data = {
"platform": chat_stream.user_info.platform,
"user_id": chat_stream.user_info.user_id,
"nickname": chat_stream.user_info.user_nickname,
"konw_time": int(time.time()),
}
old_value = await person_info_manager.get_value(person_id, "relationship_value")
old_value = self.ensure_float(old_value, person_id)
if old_value > 1000:
old_value = 1000
elif old_value < -1000:
old_value = -1000
value = valuedict[label]
if old_value >= 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.cos(math.pi * old_value / 2000)
if old_value > 500:
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700)
high_value_count = len(rdict)
if old_value > 700:
value *= 3 / (high_value_count + 2) # 排除自己
else:
value *= 3 / (high_value_count + 3)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.exp(old_value / 2000)
else:
value = 0
elif old_value < 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.exp(old_value / 2000)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.cos(math.pi * old_value / 2000)
else:
value = 0
self.positive_feedback_sys(label, stance)
value = self.mood_feedback(value)
level_num = self.calculate_level_num(old_value + value)
relationship_level = ["厌恶", "冷漠", "一般", "友好", "喜欢", "暧昧"]
logger.info(
f"用户: {chat_stream.user_info.user_nickname}"
f"当前关系: {relationship_level[level_num]}, "
f"关系值: {old_value:.2f}, "
f"当前立场情感: {stance}-{label}, "
f"变更: {value:+.5f}"
)
await person_info_manager.update_one_field(person_id, "relationship_value", old_value + value, data)
return chat_stream.user_info.user_nickname, value, relationship_level[level_num]
async def build_relationship_info(self, person, is_id: bool = False) -> str:
"""构建用于 Prompt 的关系信息字符串"""
if is_id:
person_id = person
# 如果只有 person_id需要反查 platform 和 user_id 来获取 person_name
# 这依赖于 person_id 的格式,假设是 platform_userid
try:
platform, user_id_str = person_id.split("_", 1)
person_name = await self.get_person_name(platform, user_id_str)
except ValueError:
logger.warning(f"Invalid person_id format for prompt building: {person_id}")
person_name = None
else:
print(f"person: {person}")
person_id = person_info_manager.get_person_id(person[0], person[1])
person_name = await person_info_manager.get_value(person_id, "person_name")
print(f"person_name: {person_name}")
platform, user_id, _ = person # 解包元组
person_id = person_info_manager.get_person_id(platform, user_id)
person_name = await self.get_person_name(platform, user_id)
if not person_name:
person_name = f"用户({person_id})" # 回退显示 ID
relationship_value = await person_info_manager.get_value(person_id, "relationship_value")
relationship_value = self.ensure_float(relationship_value, person_id) # 确保是 float
level_num = self.calculate_level_num(relationship_value)
if level_num == 0 or level_num == 5:
relationship_level = ["厌恶", "冷漠以对", "认识", "友好对待", "喜欢", "暧昧"]
relation_prompt2_list = [
"忽视的回应",
"冷淡回复",
"保持理性",
"愿意回复",
"积极回复",
"友善和包容的回复",
]
return f"{relationship_level[level_num]}{person_name},打算{relation_prompt2_list[level_num]}\n"
elif level_num == 2:
# 定义关系等级和对应的行为描述
relationship_levels = ["厌恶", "冷漠以对", "认识", "友好对待", "喜欢", "暧昧"]
relation_prompt_list = ["忽视的回应", "冷淡回复", "保持理性", "愿意回复", "积极回复", "友善和包容的回复"]
# 根据等级和随机性决定是否输出及输出内容
if level_num == 2: # "一般"关系不特别提示
return ""
elif level_num in [0, 5] or random.random() < 0.6: # 极好/极差 或 60% 概率
# 修正索引,确保在列表范围内
level_idx = max(0, min(level_num, len(relationship_levels) - 1))
prompt_idx = max(0, min(level_num, len(relation_prompt_list) - 1))
return f"{relationship_levels[level_idx]}{person_name},打算{relation_prompt_list[prompt_idx]}\n"
else:
if random.random() < 0.6:
relationship_level = ["厌恶", "冷漠以对", "认识", "友好对待", "喜欢", "暧昧"]
relation_prompt2_list = [
"忽视的回应",
"冷淡回复",
"保持理性",
"愿意回复",
"积极回复",
"友善和包容的回复",
]
return f"{relationship_level[level_num]}{person_name},打算{relation_prompt2_list[level_num]}\n"
else:
return ""
return ""
@staticmethod
def calculate_level_num(relationship_value) -> int:
"""关系等级计算"""
if -1000 <= relationship_value < -227:
level_num = 0
elif -227 <= relationship_value < -73:
level_num = 1
elif -73 <= relationship_value < 227:
level_num = 2
elif 227 <= relationship_value < 587:
level_num = 3
elif 587 <= relationship_value < 900:
level_num = 4
elif 900 <= relationship_value <= 1000:
level_num = 5
else:
level_num = 5 if relationship_value > 1000 else 0
return level_num
# 确保 value 是 float
try:
value = float(relationship_value.to_decimal() if isinstance(relationship_value, Decimal128) else relationship_value)
except (ValueError, TypeError, AttributeError):
value = 0.0 # 转换失败默认为 0
# 阈值判断
if value < -227: return 0
elif value < -73: return 1
elif value < 227: return 2
elif value < 587: return 3
elif value < 900: return 4
else: return 5 # >= 900
@staticmethod
def ensure_float(value, person_id):
"""确保返回浮点数转换失败返回0.0"""
if isinstance(value, float):
return value
if isinstance(value, (float, int)): # 直接处理 float 和 int
return float(value)
try:
# 尝试处理 Decimal128 或其他可转换为 float 的类型
return float(value.to_decimal() if isinstance(value, Decimal128) else value)
except (ValueError, TypeError, AttributeError):
logger.warning(f"[关系管理] {person_id}值转换失败(原始值:{value}已重置为0")
logger.warning(f"[关系管理] {person_id} 值转换失败(原始值:{value}已重置为0")
# 在转换失败时,尝试在数据库中将该字段重置为 0.0
try:
person_info_manager.update_one_field(person_id, "relationship_value", 0.0)
except Exception as db_err:
logger.error(f"Failed to reset relationship_value for {person_id} in DB: {db_err}")
return 0.0