feat:加入聊天频率控制相关api

pull/1187/head
SengokuCola 2025-08-15 13:46:06 +08:00
parent 44fff4ed8a
commit 52ec28677e
6 changed files with 357 additions and 272 deletions

View File

@ -28,6 +28,8 @@ from src.mais4u.s4u_config import s4u_config
from src.chat.chat_loop.hfc_utils import send_typing, stop_typing
# 导入记忆系统
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.chat.frequency_control.talk_frequency_control import TalkFrequencyControlManager
from src.chat.frequency_control.focus_value_control import FocusValueControlManager
ERROR_LOOP_INFO = {
"loop_plan_info": {
@ -92,6 +94,8 @@ class HeartFChatting:
self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id)
self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id)
self.talk_frequency_control = TalkFrequencyControlManager().get_talk_frequency_control(self.stream_id)
self.focus_value_control = FocusValueControlManager().get_focus_value_control(self.stream_id)
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
@ -203,7 +207,7 @@ class HeartFChatting:
total_recent_interest = sum(self.recent_interest_records)
# 计算调整后的阈值
adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.stream_id)
adjusted_threshold = 1 / self.talk_frequency_control.get_current_talk_frequency()
logger.info(f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}")
@ -227,7 +231,7 @@ class HeartFChatting:
bool: 是否应该处理消息
"""
new_message_count = len(new_message)
talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id)
talk_frequency = self.talk_frequency_control.get_current_talk_frequency()
modified_exit_count_threshold = self.focus_energy * 0.5 / talk_frequency
modified_exit_interest_threshold = 1.5 / talk_frequency
@ -365,7 +369,7 @@ class HeartFChatting:
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
normal_mode_probability = calculate_normal_mode_probability(interest_value) * 0.5 / global_config.chat.get_current_talk_frequency(self.stream_id)
normal_mode_probability = calculate_normal_mode_probability(interest_value) * 0.5 / self.talk_frequency_control.get_current_talk_frequency()
# 根据概率决定使用哪种模式
if random.random() < normal_mode_probability:
@ -393,7 +397,7 @@ class HeartFChatting:
logger.error(f"{self.log_prefix} 记忆构建失败: {e}")
if random.random() > global_config.chat.focus_value and mode == ChatMode.FOCUS:
if random.random() > self.focus_value_control.get_current_focus_value() and mode == ChatMode.FOCUS:
#如果激活度没有激活并且聊天活跃度低有可能不进行plan相当于不在电脑前不进行认真思考
actions = [
{

View File

@ -0,0 +1,141 @@
from typing import Optional
from src.config.config import global_config
from src.chat.frequency_control.utils import parse_stream_config_to_chat_id
class FocusValueControl:
def __init__(self,chat_id:str):
self.chat_id = chat_id
self.focus_value_adjust = 1
def get_current_focus_value(self) -> float:
return get_current_focus_value(self.chat_id) * self.focus_value_adjust
class FocusValueControlManager:
def __init__(self):
self.focus_value_controls = {}
def get_focus_value_control(self,chat_id:str) -> FocusValueControl:
if chat_id not in self.focus_value_controls:
self.focus_value_controls[chat_id] = FocusValueControl(chat_id)
return self.focus_value_controls[chat_id]
def get_current_focus_value(chat_id: Optional[str] = None) -> float:
"""
根据当前时间和聊天流获取对应的 focus_value
"""
if not global_config.chat.focus_value_adjust:
return global_config.chat.focus_value
if chat_id:
stream_focus_value = get_stream_specific_focus_value(chat_id)
if stream_focus_value is not None:
return stream_focus_value
global_focus_value = get_global_focus_value()
if global_focus_value is not None:
return global_focus_value
return global_config.chat.focus_value
def get_stream_specific_focus_value(chat_id: str) -> Optional[float]:
"""
获取特定聊天流在当前时间的专注度
Args:
chat_stream_id: 聊天流ID哈希值
Returns:
float: 专注度值如果没有配置则返回 None
"""
# 查找匹配的聊天流配置
for config_item in global_config.chat.focus_value_adjust:
if not config_item or len(config_item) < 2:
continue
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
# 解析配置字符串并生成对应的 chat_id
config_chat_id = parse_stream_config_to_chat_id(stream_config_str)
if config_chat_id is None:
continue
# 比较生成的 chat_id
if config_chat_id != chat_id:
continue
# 使用通用的时间专注度解析方法
return get_time_based_focus_value(config_item[1:])
return None
def get_time_based_focus_value(time_focus_list: list[str]) -> Optional[float]:
"""
根据时间配置列表获取当前时段的专注度
Args:
time_focus_list: 时间专注度配置列表格式为 ["HH:MM,focus_value", ...]
Returns:
float: 专注度值如果没有配置则返回 None
"""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
current_hour, current_minute = map(int, current_time.split(":"))
current_minutes = current_hour * 60 + current_minute
# 解析时间专注度配置
time_focus_pairs = []
for time_focus_str in time_focus_list:
try:
time_str, focus_str = time_focus_str.split(",")
hour, minute = map(int, time_str.split(":"))
focus_value = float(focus_str)
minutes = hour * 60 + minute
time_focus_pairs.append((minutes, focus_value))
except (ValueError, IndexError):
continue
if not time_focus_pairs:
return None
# 按时间排序
time_focus_pairs.sort(key=lambda x: x[0])
# 查找当前时间对应的专注度
current_focus_value = None
for minutes, focus_value in time_focus_pairs:
if current_minutes >= minutes:
current_focus_value = focus_value
else:
break
# 如果当前时间在所有配置时间之前,使用最后一个时间段的专注度(跨天逻辑)
if current_focus_value is None and time_focus_pairs:
current_focus_value = time_focus_pairs[-1][1]
return current_focus_value
def get_global_focus_value() -> Optional[float]:
"""
获取全局默认专注度配置
Returns:
float: 专注度值如果没有配置则返回 None
"""
for config_item in global_config.chat.focus_value_adjust:
if not config_item or len(config_item) < 2:
continue
# 检查是否为全局默认配置(第一个元素为空字符串)
if config_item[0] == "":
return get_time_based_focus_value(config_item[1:])
return None

View File

@ -0,0 +1,142 @@
from typing import Optional
from src.config.config import global_config
from src.chat.frequency_control.utils import parse_stream_config_to_chat_id
class TalkFrequencyControl:
def __init__(self,chat_id:str):
self.chat_id = chat_id
self.talk_frequency_adjust = 1
def get_current_talk_frequency(self) -> float:
return get_current_talk_frequency(self.chat_id) * self.talk_frequency_adjust
class TalkFrequencyControlManager:
def __init__(self):
self.talk_frequency_controls = {}
def get_talk_frequency_control(self,chat_id:str) -> TalkFrequencyControl:
if chat_id not in self.talk_frequency_controls:
self.talk_frequency_controls[chat_id] = TalkFrequencyControl(chat_id)
return self.talk_frequency_controls[chat_id]
def get_current_talk_frequency(chat_id: Optional[str] = None) -> float:
"""
根据当前时间和聊天流获取对应的 talk_frequency
Args:
chat_stream_id: 聊天流ID格式为 "platform:chat_id:type"
Returns:
float: 对应的频率值
"""
if not global_config.chat.talk_frequency_adjust:
return global_config.chat.talk_frequency
# 优先检查聊天流特定的配置
if chat_id:
stream_frequency = get_stream_specific_frequency(chat_id)
if stream_frequency is not None:
return stream_frequency
# 检查全局时段配置(第一个元素为空字符串的配置)
global_frequency = get_global_frequency()
return global_config.chat.talk_frequency if global_frequency is None else global_frequency
def get_time_based_frequency(time_freq_list: list[str]) -> Optional[float]:
"""
根据时间配置列表获取当前时段的频率
Args:
time_freq_list: 时间频率配置列表格式为 ["HH:MM,frequency", ...]
Returns:
float: 频率值如果没有配置则返回 None
"""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
current_hour, current_minute = map(int, current_time.split(":"))
current_minutes = current_hour * 60 + current_minute
# 解析时间频率配置
time_freq_pairs = []
for time_freq_str in time_freq_list:
try:
time_str, freq_str = time_freq_str.split(",")
hour, minute = map(int, time_str.split(":"))
frequency = float(freq_str)
minutes = hour * 60 + minute
time_freq_pairs.append((minutes, frequency))
except (ValueError, IndexError):
continue
if not time_freq_pairs:
return None
# 按时间排序
time_freq_pairs.sort(key=lambda x: x[0])
# 查找当前时间对应的频率
current_frequency = None
for minutes, frequency in time_freq_pairs:
if current_minutes >= minutes:
current_frequency = frequency
else:
break
# 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑)
if current_frequency is None and time_freq_pairs:
current_frequency = time_freq_pairs[-1][1]
return current_frequency
def get_stream_specific_frequency(chat_stream_id: str):
"""
获取特定聊天流在当前时间的频率
Args:
chat_stream_id: 聊天流ID哈希值
Returns:
float: 频率值如果没有配置则返回 None
"""
# 查找匹配的聊天流配置
for config_item in global_config.chat.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
# 解析配置字符串并生成对应的 chat_id
config_chat_id = parse_stream_config_to_chat_id(stream_config_str)
if config_chat_id is None:
continue
# 比较生成的 chat_id
if config_chat_id != chat_stream_id:
continue
# 使用通用的时间频率解析方法
return get_time_based_frequency(config_item[1:])
return None
def get_global_frequency() -> Optional[float]:
"""
获取全局默认频率配置
Returns:
float: 频率值如果没有配置则返回 None
"""
for config_item in global_config.chat.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
# 检查是否为全局默认配置(第一个元素为空字符串)
if config_item[0] == "":
return get_time_based_frequency(config_item[1:])
return None

View File

@ -0,0 +1,37 @@
from typing import Optional
import hashlib
def parse_stream_config_to_chat_id(stream_config_str: str) -> Optional[str]:
"""
解析流配置字符串并生成对应的 chat_id
Args:
stream_config_str: 格式为 "platform:id:type" 的字符串
Returns:
str: 生成的 chat_id如果解析失败则返回 None
"""
try:
parts = stream_config_str.split(":")
if len(parts) != 3:
return None
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
# 判断是否为群聊
is_group = stream_type == "group"
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
if is_group:
components = [platform, str(id_str)]
else:
components = [platform, str(id_str), "private"]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
except (ValueError, IndexError):
return None

View File

@ -115,274 +115,6 @@ class ChatConfig(ConfigBase):
- focus_value_adjust 控制专注思考能力数值越低越容易专注消耗token也越多
"""
def get_current_focus_value(self, chat_stream_id: Optional[str] = None) -> float:
"""
根据当前时间和聊天流获取对应的 focus_value
"""
if not self.focus_value_adjust:
return self.focus_value
if chat_stream_id:
stream_focus_value = self._get_stream_specific_focus_value(chat_stream_id)
if stream_focus_value is not None:
return stream_focus_value
global_focus_value = self._get_global_focus_value()
if global_focus_value is not None:
return global_focus_value
return self.focus_value
def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float:
"""
根据当前时间和聊天流获取对应的 talk_frequency
Args:
chat_stream_id: 聊天流ID格式为 "platform:chat_id:type"
Returns:
float: 对应的频率值
"""
if not self.talk_frequency_adjust:
return self.talk_frequency
# 优先检查聊天流特定的配置
if chat_stream_id:
stream_frequency = self._get_stream_specific_frequency(chat_stream_id)
if stream_frequency is not None:
return stream_frequency
# 检查全局时段配置(第一个元素为空字符串的配置)
global_frequency = self._get_global_frequency()
return self.talk_frequency if global_frequency is None else global_frequency
def _get_global_focus_value(self) -> Optional[float]:
"""
获取全局默认专注度配置
Returns:
float: 专注度值如果没有配置则返回 None
"""
for config_item in self.focus_value_adjust:
if not config_item or len(config_item) < 2:
continue
# 检查是否为全局默认配置(第一个元素为空字符串)
if config_item[0] == "":
return self._get_time_based_focus_value(config_item[1:])
return None
def _get_time_based_focus_value(self, time_focus_list: list[str]) -> Optional[float]:
"""
根据时间配置列表获取当前时段的专注度
Args:
time_focus_list: 时间专注度配置列表格式为 ["HH:MM,focus_value", ...]
Returns:
float: 专注度值如果没有配置则返回 None
"""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
current_hour, current_minute = map(int, current_time.split(":"))
current_minutes = current_hour * 60 + current_minute
# 解析时间专注度配置
time_focus_pairs = []
for time_focus_str in time_focus_list:
try:
time_str, focus_str = time_focus_str.split(",")
hour, minute = map(int, time_str.split(":"))
focus_value = float(focus_str)
minutes = hour * 60 + minute
time_focus_pairs.append((minutes, focus_value))
except (ValueError, IndexError):
continue
if not time_focus_pairs:
return None
# 按时间排序
time_focus_pairs.sort(key=lambda x: x[0])
# 查找当前时间对应的专注度
current_focus_value = None
for minutes, focus_value in time_focus_pairs:
if current_minutes >= minutes:
current_focus_value = focus_value
else:
break
# 如果当前时间在所有配置时间之前,使用最后一个时间段的专注度(跨天逻辑)
if current_focus_value is None and time_focus_pairs:
current_focus_value = time_focus_pairs[-1][1]
return current_focus_value
def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]:
"""
根据时间配置列表获取当前时段的频率
Args:
time_freq_list: 时间频率配置列表格式为 ["HH:MM,frequency", ...]
Returns:
float: 频率值如果没有配置则返回 None
"""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
current_hour, current_minute = map(int, current_time.split(":"))
current_minutes = current_hour * 60 + current_minute
# 解析时间频率配置
time_freq_pairs = []
for time_freq_str in time_freq_list:
try:
time_str, freq_str = time_freq_str.split(",")
hour, minute = map(int, time_str.split(":"))
frequency = float(freq_str)
minutes = hour * 60 + minute
time_freq_pairs.append((minutes, frequency))
except (ValueError, IndexError):
continue
if not time_freq_pairs:
return None
# 按时间排序
time_freq_pairs.sort(key=lambda x: x[0])
# 查找当前时间对应的频率
current_frequency = None
for minutes, frequency in time_freq_pairs:
if current_minutes >= minutes:
current_frequency = frequency
else:
break
# 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑)
if current_frequency is None and time_freq_pairs:
current_frequency = time_freq_pairs[-1][1]
return current_frequency
def _get_stream_specific_focus_value(self, chat_stream_id: str) -> Optional[float]:
"""
获取特定聊天流在当前时间的专注度
Args:
chat_stream_id: 聊天流ID哈希值
Returns:
float: 专注度值如果没有配置则返回 None
"""
# 查找匹配的聊天流配置
for config_item in self.focus_value_adjust:
if not config_item or len(config_item) < 2:
continue
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
# 解析配置字符串并生成对应的 chat_id
config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str)
if config_chat_id is None:
continue
# 比较生成的 chat_id
if config_chat_id != chat_stream_id:
continue
# 使用通用的时间专注度解析方法
return self._get_time_based_focus_value(config_item[1:])
return None
def _get_stream_specific_frequency(self, chat_stream_id: str):
"""
获取特定聊天流在当前时间的频率
Args:
chat_stream_id: 聊天流ID哈希值
Returns:
float: 频率值如果没有配置则返回 None
"""
# 查找匹配的聊天流配置
for config_item in self.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
# 解析配置字符串并生成对应的 chat_id
config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str)
if config_chat_id is None:
continue
# 比较生成的 chat_id
if config_chat_id != chat_stream_id:
continue
# 使用通用的时间频率解析方法
return self._get_time_based_frequency(config_item[1:])
return None
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
"""
解析流配置字符串并生成对应的 chat_id
Args:
stream_config_str: 格式为 "platform:id:type" 的字符串
Returns:
str: 生成的 chat_id如果解析失败则返回 None
"""
try:
parts = stream_config_str.split(":")
if len(parts) != 3:
return None
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
# 判断是否为群聊
is_group = stream_type == "group"
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
import hashlib
if is_group:
components = [platform, str(id_str)]
else:
components = [platform, str(id_str), "private"]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
except (ValueError, IndexError):
return None
def _get_global_frequency(self) -> Optional[float]:
"""
获取全局默认频率配置
Returns:
float: 频率值如果没有配置则返回 None
"""
for config_item in self.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
# 检查是否为全局默认配置(第一个元素为空字符串)
if config_item[0] == "":
return self._get_time_based_frequency(config_item[1:])
return None
@dataclass

View File

@ -0,0 +1,29 @@
from src.common.logger import get_logger
from src.chat.frequency_control.focus_value_control import FocusValueControlManager
from src.chat.frequency_control.talk_frequency_control import TalkFrequencyControlManager
logger = get_logger("frequency_api")
def get_current_focus_value(chat_id: str) -> float:
return FocusValueControlManager().get_focus_value_control(chat_id).get_current_focus_value()
def get_current_talk_frequency(chat_id: str) -> float:
return TalkFrequencyControlManager().get_talk_frequency_control(chat_id).get_current_talk_frequency()
def set_focus_value_adjust(chat_id: str, focus_value_adjust: float) -> None:
FocusValueControlManager().get_focus_value_control(chat_id).focus_value_adjust = focus_value_adjust
def set_talk_frequency_adjust(chat_id: str, talk_frequency_adjust: float) -> None:
TalkFrequencyControlManager().get_talk_frequency_control(chat_id).talk_frequency_adjust = talk_frequency_adjust
def get_focus_value_adjust(chat_id: str) -> float:
return FocusValueControlManager().get_focus_value_control(chat_id).focus_value_adjust
def get_talk_frequency_adjust(chat_id: str) -> float:
return TalkFrequencyControlManager().get_talk_frequency_control(chat_id).talk_frequency_adjust