MaiBot/src/person_info/relationship_manager.py

379 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from src.common.logger_manager import get_logger
from src.chat.message_receive.chat_stream import ChatStream
import math
from src.person_info.person_info import person_info_manager
import time
import random
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat_users
from src.chat.utils.chat_message_builder import build_readable_messages
from src.manager.mood_manager import mood_manager
logger = get_logger("relation")
class RelationshipManager:
def __init__(self):
self.positive_feedback_value = 0 # 正反馈系统
self.gain_coefficient = [1.0, 1.0, 1.1, 1.2, 1.4, 1.7, 1.9, 2.0]
self._mood_manager = None
self.relationship_llm = LLMRequest(
model=global_config.model.normal_chat_1,
max_tokens=1000,
request_type="relationship", # 用于动作规划
)
@property
def mood_manager(self):
if self._mood_manager is None:
self._mood_manager = mood_manager
return self._mood_manager
def positive_feedback_sys(self, label: str, stance: str):
"""正反馈系统,通过正反馈系数增益情绪变化,根据情绪再影响关系变更"""
positive_list = [
"开心",
"惊讶",
"害羞",
]
negative_list = [
"愤怒",
"悲伤",
"恐惧",
"厌恶",
]
if label in positive_list:
if 7 > self.positive_feedback_value >= 0:
self.positive_feedback_value += 1
elif self.positive_feedback_value < 0:
self.positive_feedback_value = 0
elif label in negative_list:
if -7 < self.positive_feedback_value <= 0:
self.positive_feedback_value -= 1
elif self.positive_feedback_value > 0:
self.positive_feedback_value = 0
if abs(self.positive_feedback_value) > 1:
logger.debug(f"触发mood变更增益当前增益系数{self.gain_coefficient[abs(self.positive_feedback_value)]}")
def mood_feedback(self, value):
"""情绪反馈"""
mood_manager = self.mood_manager
mood_gain = mood_manager.current_mood.valence**2 * math.copysign(1, value * mood_manager.current_mood.valence)
value += value * mood_gain
logger.debug(f"当前relationship增益系数{mood_gain:.3f}")
return value
def feedback_to_mood(self, mood_value):
"""对情绪的反馈"""
coefficient = self.gain_coefficient[abs(self.positive_feedback_value)]
if mood_value > 0 and self.positive_feedback_value > 0 or mood_value < 0 and self.positive_feedback_value < 0:
return mood_value * coefficient
else:
return mood_value / coefficient
@staticmethod
async def is_known_some_one(platform, user_id):
"""判断是否认识某人"""
is_known = await person_info_manager.is_person_known(platform, user_id)
return is_known
@staticmethod
async def is_qved_name(platform, user_id):
"""判断是否认识某人"""
person_id = person_info_manager.get_person_id(platform, user_id)
is_qved = await person_info_manager.has_one_field(person_id, "person_name")
old_name = await person_info_manager.get_value(person_id, "person_name")
# print(f"old_name: {old_name}")
# print(f"is_qved: {is_qved}")
if is_qved and old_name is not None:
return True
else:
return False
@staticmethod
async def first_knowing_some_one(
platform: str, user_id: str, user_nickname: str, user_cardname: str, user_avatar: str
):
"""判断是否认识某人"""
person_id = person_info_manager.get_person_id(platform, user_id)
data = {
"platform": platform,
"user_id": user_id,
"nickname": user_nickname,
"konw_time": int(time.time()),
}
await person_info_manager.update_one_field(
person_id=person_id, field_name="nickname", value=user_nickname, data=data
)
await person_info_manager.qv_person_name(
person_id=person_id, user_nickname=user_nickname, user_cardname=user_cardname, user_avatar=user_avatar
)
async def calculate_update_relationship_value_with_reason(
self, chat_stream: ChatStream, label: str, stance: str, reason: str
) -> tuple:
"""计算并变更关系值
新的关系值变更计算方式:
将关系值限定在-1000到1000
对于关系值的变更,期望:
1.向两端逼近时会逐渐减缓
2.关系越差,改善越难,关系越好,恶化越容易
3.人维护关系的精力往往有限,所以当高关系值用户越多,对于中高关系值用户增长越慢
4.连续正面或负面情感会正反馈
返回:
用户昵称,变更值,变更后关系等级
"""
stancedict = {
"支持": 0,
"中立": 1,
"反对": 2,
}
valuedict = {
"开心": 1.5,
"愤怒": -2.0,
"悲伤": -0.5,
"惊讶": 0.6,
"害羞": 2.0,
"平静": 0.3,
"恐惧": -1.5,
"厌恶": -1.0,
"困惑": 0.5,
}
person_id = person_info_manager.get_person_id(chat_stream.user_info.platform, chat_stream.user_info.user_id)
data = {
"platform": chat_stream.user_info.platform,
"user_id": chat_stream.user_info.user_id,
"nickname": chat_stream.user_info.user_nickname,
"konw_time": int(time.time()),
}
old_value = await person_info_manager.get_value(person_id, "relationship_value")
old_value = self.ensure_float(old_value, person_id)
if old_value > 1000:
old_value = 1000
elif old_value < -1000:
old_value = -1000
value = valuedict[label]
if old_value >= 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.cos(math.pi * old_value / 2000)
if old_value > 500:
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700)
high_value_count = len(rdict)
if old_value > 700:
value *= 3 / (high_value_count + 2) # 排除自己
else:
value *= 3 / (high_value_count + 3)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.exp(old_value / 2000)
else:
value = 0
elif old_value < 0:
if valuedict[label] >= 0 and stancedict[stance] != 2:
value = value * math.exp(old_value / 2000)
elif valuedict[label] < 0 and stancedict[stance] != 0:
value = value * math.cos(math.pi * old_value / 2000)
else:
value = 0
self.positive_feedback_sys(label, stance)
value = self.mood_feedback(value)
level_num = self.calculate_level_num(old_value + value)
relationship_level = ["厌恶", "冷漠", "一般", "友好", "喜欢", "暧昧"]
logger.info(
f"用户: {chat_stream.user_info.user_nickname}"
f"当前关系: {relationship_level[level_num]}, "
f"关系值: {old_value:.2f}, "
f"当前立场情感: {stance}-{label}, "
f"变更: {value:+.5f}"
)
await person_info_manager.update_one_field(person_id, "relationship_value", old_value + value, data)
return chat_stream.user_info.user_nickname, value, relationship_level[level_num]
async def build_relationship_info(self, person, is_id: bool = False) -> str:
if is_id:
person_id = person
else:
# print(f"person: {person}")
person_id = person_info_manager.get_person_id(person[0], person[1])
person_name = await person_info_manager.get_value(person_id, "person_name")
# print(f"person_name: {person_name}")
relationship_value = await person_info_manager.get_value(person_id, "relationship_value")
level_num = self.calculate_level_num(relationship_value)
relation_value_prompt = ""
if level_num == 0 or level_num == 5:
relationship_level = ["厌恶", "冷漠以对", "认识", "友好对待", "喜欢", "暧昧"]
relation_prompt2_list = [
"忽视的回应",
"冷淡回复",
"保持理性",
"愿意回复",
"积极回复",
"友善和包容的回复",
]
relation_value_prompt = (
f"{relationship_level[level_num]}{person_name},打算{relation_prompt2_list[level_num]}"
)
elif level_num == 2:
relation_value_prompt = ""
else:
if random.random() < 0.6:
relationship_level = ["厌恶", "冷漠以对", "认识", "友好对待", "喜欢", "暧昧"]
relation_prompt2_list = [
"忽视的回应",
"冷淡回复",
"保持理性",
"愿意回复",
"积极回复",
"友善和包容的回复",
]
relation_value_prompt = (
f"{relationship_level[level_num]}{person_name},打算{relation_prompt2_list[level_num]}"
)
else:
relation_value_prompt = ""
if relation_value_prompt:
nickname_str = await person_info_manager.get_value(person_id, "nickname")
platform = await person_info_manager.get_value(person_id, "platform")
relation_prompt = f"{relation_value_prompt}ta在{platform}上的昵称是{nickname_str}\n"
else:
relation_prompt = ""
person_name_reason = await person_info_manager.get_value(person_id, "person_name_reason")
if person_name_reason:
relation_prompt += f"ta的昵称{person_name}的由来是:{person_name_reason}\n"
person_impression = await person_info_manager.get_value(person_id, "person_impression")
if person_impression:
relation_prompt += f"ta的印象是{person_impression}\n"
return relation_prompt
@staticmethod
def calculate_level_num(relationship_value) -> int:
"""关系等级计算"""
if -1000 <= relationship_value < -227:
level_num = 0
elif -227 <= relationship_value < -73:
level_num = 1
elif -73 <= relationship_value < 227:
level_num = 2
elif 227 <= relationship_value < 587:
level_num = 3
elif 587 <= relationship_value < 900:
level_num = 4
elif 900 <= relationship_value <= 1000:
level_num = 5
else:
level_num = 5 if relationship_value > 1000 else 0
return level_num
async def update_person_impression(self, person_id, chat_id, reason, timestamp):
"""更新用户印象
Args:
person_id: 用户ID
chat_id: 聊天ID
reason: 更新原因
timestamp: 时间戳
"""
# 获取现有印象和用户信息
person_name = await person_info_manager.get_value(person_id, "person_name")
nickname = await person_info_manager.get_value(person_id, "nickname")
old_impression = await person_info_manager.get_value(person_id, "person_impression")
user_id = await person_info_manager.get_value(person_id, "user_id")
logger.debug(f"更新印象的person_id: {person_id}, chat_id: {chat_id}, reason: {reason}, timestamp: {timestamp}, user_id: {user_id}")
# 获取时间戳前后的消息
messages_before = get_raw_msg_by_timestamp_with_chat_users(
chat_id=chat_id,
timestamp_start=timestamp - 3600, # 前1小时
timestamp_end=timestamp,
person_ids=[user_id],
limit=30,
limit_mode="latest"
)
messages_after = get_raw_msg_by_timestamp_with_chat_users(
chat_id=chat_id,
timestamp_start=timestamp,
timestamp_end=timestamp + 3600, # 后1小时
person_ids=[user_id],
limit=30,
limit_mode="earliest"
)
# 合并消息并按时间排序
user_messages = messages_before + messages_after
user_messages.sort(key=lambda x: x["time"])
# print(f"user_messages: {user_messages}")
# 构建可读消息
readable_messages = await build_readable_messages(
messages=user_messages,
replace_bot_name=True,
timestamp_mode="relative",
truncate=False
)
# 使用LLM总结印象
prompt = f"""基于以下信息,总结对{person_name}(昵称:{nickname})的印象:
原因:{reason}
历史印象:{old_impression if old_impression else ''}
最近发言:
{readable_messages}
请用简洁的语言总结对这个人的印象不超过200字。"""
new_impression, _ = await self.relationship_llm.generate_response_async(prompt=prompt)
logger.debug(f"新印象prompt{prompt}")
logger.info(f"新印象:{new_impression}")
# 合并新旧印象
if old_impression:
merge_prompt = f"""请将以下两段印象合并,形成一段连贯的描述:
旧印象:{old_impression}
新印象:{new_impression}
请用简洁的语言合并这两段印象不超过200字。"""
final_impression, _ = await self.relationship_llm.generate_response_async(prompt=merge_prompt)
logger.debug(f"合并印象prompt{merge_prompt}")
logger.info(f"合并印象:{final_impression}")
else:
final_impression = new_impression
# 更新到数据库
await person_info_manager.update_one_field(person_id, "person_impression", final_impression)
return final_impression
relationship_manager = RelationshipManager()