feat: 添加闲置聊天功能

pull/937/head
lokong 2025-05-05 22:02:20 +08:00
parent db39318ffb
commit 440f474599
4 changed files with 437 additions and 1 deletions

View File

@ -10,6 +10,7 @@ from typing import Dict, Any, Optional
from ..chat.message import Message
from .pfc_types import ConversationState
from .pfc import ChatObserver, GoalAnalyzer
from .idle_conversation_starter import IdleConversationStarter
from .message_sender import DirectMessageSender
from src.common.logger_manager import get_logger
from .action_planner import ActionPlanner
@ -55,6 +56,7 @@ class Conversation:
self.knowledge_fetcher = KnowledgeFetcher(self.private_name)
self.waiter = Waiter(self.stream_id, self.private_name)
self.direct_sender = DirectMessageSender(self.private_name)
self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name)
# 获取聊天流信息
self.chat_stream = chat_manager.get_stream(self.stream_id)
@ -113,6 +115,9 @@ class Conversation:
# 让 ChatObserver 从加载的最后一条消息之后开始同步
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
# 初始化空闲对话检测器的最后消息时间
await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time)
else:
logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。")
@ -121,6 +126,11 @@ class Conversation:
# 出错也要继续,只是没有历史记录而已
# 组件准备完成,启动该论对话
self.should_continue = True
# 启动空闲对话检测器
self.idle_conversation_starter.start()
logger.info(f"[私聊][{self.private_name}]空闲对话检测器已启动")
asyncio.create_task(self.start())
async def start(self):
@ -137,6 +147,10 @@ class Conversation:
while self.should_continue:
# 忽略逻辑
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
# 暂停空闲对话检测器,避免在忽略期间触发
if hasattr(self, 'idle_conversation_starter') and self.idle_conversation_starter._running:
self.idle_conversation_starter.stop()
logger.debug(f"[私聊][{self.private_name}]对话被暂时忽略,暂停空闲对话检测")
await asyncio.sleep(30)
continue
elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp:
@ -144,6 +158,12 @@ class Conversation:
self.ignore_until_timestamp = None
self.should_continue = False
continue
else:
# 确保空闲对话检测器在正常对话时是启动的
if hasattr(self, 'idle_conversation_starter') and not self.idle_conversation_starter._running:
self.idle_conversation_starter.start()
logger.debug(f"[私聊][{self.private_name}]恢复空闲对话检测")
try:
# --- 在规划前记录当前新消息数量 ---
initial_new_message_count = 0
@ -368,6 +388,9 @@ class Conversation:
self.conversation_info.last_successful_reply_action = "send_new_message"
action_successful = True # 标记动作成功
# 更新最后消息时间,重置空闲检测计时器
await self.idle_conversation_starter.update_last_message_time()
elif need_replan:
# 打回动作决策
logger.warning(
@ -472,6 +495,9 @@ class Conversation:
self.conversation_info.last_successful_reply_action = "direct_reply"
action_successful = True # 标记动作成功
# 更新最后消息时间,重置空闲检测计时器
await self.idle_conversation_starter.update_last_message_time()
elif need_replan:
# 打回动作决策
logger.warning(
@ -696,3 +722,15 @@ class Conversation:
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")
async def stop(self):
"""停止对话处理"""
logger.info(f"[私聊][{self.private_name}]停止对话 {self.stream_id}")
self.should_continue = False
# 停止空闲对话检测器
if hasattr(self, 'idle_conversation_starter'):
self.idle_conversation_starter.stop()
if hasattr(self, 'chat_observer'):
self.chat_observer.stop()

View File

@ -0,0 +1,351 @@
from typing import List, Tuple, TYPE_CHECKING, Optional, Union
import asyncio
import time
import random
from src.common.logger import get_module_logger
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from .message_sender import DirectMessageSender
from ..chat.chat_stream import ChatStream
from maim_message import UserInfo
from src.individuality.individuality import Individuality
from src.plugins.utils.chat_message_builder import build_readable_messages
if TYPE_CHECKING:
from ..chat.message import Message
from .conversation import Conversation
logger = get_module_logger("pfc")
class IdleConversationStarter:
"""长时间无对话主动发起对话的组件
该组件会在一段时间没有对话后自动生成一条消息发送给用户以保持对话的活跃度
时间阈值会在配置的最小和最大值之间随机选择每次发送消息后都会重置
"""
def __init__(self, stream_id: str, private_name: str):
"""初始化空闲对话启动器
Args:
stream_id: 聊天流ID
private_name: 私聊用户名称
"""
self.stream_id: str = stream_id
self.private_name: str = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.message_sender = DirectMessageSender(private_name)
# 添加异步锁,保护对共享变量的访问
self._lock: asyncio.Lock = asyncio.Lock()
# LLM请求对象用于生成主动对话内容
self.llm = LLMRequest(
model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter"
)
# 个性化信息
self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3)
self.name: str = global_config.BOT_NICKNAME
self.nick_name: List[str] = global_config.BOT_ALIAS_NAMES
# 从配置文件读取配置参数,或使用默认值
self.enabled: bool = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True)
self.idle_check_interval: int = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10)
self.min_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 60)
self.max_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 120)
# 计算实际触发阈值在min和max之间随机
self.actual_idle_threshold: int = random.randint(self.min_idle_time, self.max_idle_time)
# 工作状态
self.last_message_time: float = time.time()
self._running: bool = False
self._task: Optional[asyncio.Task] = None
def start(self) -> None:
"""启动空闲对话检测
如果功能被禁用或已经在运行则不会启动
"""
# 如果功能被禁用,则不启动
if not self.enabled:
logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用")
return
if self._running:
logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中")
return
self._running = True
self._task = asyncio.create_task(self._check_idle_loop())
logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}")
def stop(self) -> None:
"""停止空闲对话检测
取消当前运行的任务并重置状态
"""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
self._task = None
logger.info(f"[私聊][{self.private_name}]停止空闲对话检测")
async def update_last_message_time(self, message_time: Optional[float] = None) -> None:
"""更新最后一条消息的时间
Args:
message_time: 消息时间戳如果为None则使用当前时间
"""
async with self._lock:
self.last_message_time = message_time or time.time()
# 重新随机化下一次触发的时间阈值
self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time)
logger.debug(f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}")
def reload_config(self) -> None:
"""重新加载配置
从配置文件重新读取所有参数以便动态调整空闲对话检测的行为
"""
try:
# 从配置文件重新读取参数
self.enabled = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True)
self.idle_check_interval = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10)
self.min_idle_time = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 7200)
self.max_idle_time = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 18000)
logger.debug(f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 最短间隔={self.min_idle_time}秒, 最长间隔={self.max_idle_time}")
# 重新计算实际阈值
async def update_threshold():
async with self._lock:
self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time)
logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}")
# 创建一个任务来异步更新阈值
asyncio.create_task(update_threshold())
except Exception as e:
logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}")
async def _check_idle_loop(self) -> None:
"""检查空闲状态的循环
定期检查是否长时间无对话如果达到阈值则尝试主动发起对话
"""
try:
config_reload_counter = 0
config_reload_interval = 100 # 每100次检查重新加载一次配置
while self._running:
# 定期重新加载配置
config_reload_counter += 1
if config_reload_counter >= config_reload_interval:
self.reload_config()
config_reload_counter = 0
# 检查是否启用了主动对话功能
if not self.enabled:
# 如果禁用了功能,就等待一段时间后再次检查配置
await asyncio.sleep(self.idle_check_interval)
continue
# 使用锁保护对共享变量的读取
current_time = time.time()
async with self._lock:
idle_time = current_time - self.last_message_time
threshold = self.actual_idle_threshold
if idle_time >= threshold:
logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天")
await self._initiate_conversation()
# 更新时间,避免连续触发
await self.update_last_message_time()
# 等待下一次检查
await asyncio.sleep(self.idle_check_interval)
except asyncio.CancelledError:
logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}")
# 尝试重新启动检测循环
if self._running:
logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测")
self._task = asyncio.create_task(self._check_idle_loop())
async def _initiate_conversation(self) -> None:
"""生成并发送主动对话内容
获取聊天历史记录使用LLM生成合适的开场白然后发送消息
"""
try:
# 获取聊天历史记录,用于生成更合适的开场白
messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息
chat_history_text = await build_readable_messages(
messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
# 构建提示词
prompt = f"""{self.personality_info}。你的名字是{self.name}
你正在与用户{self.private_name}进行QQ私聊,
但已经有一段时间没有对话了
你想要主动发起一个友好的对话可以说说自己在做的事情或者询问对方在做什么
请基于以下之前的对话历史生成一条自然友好符合你个性的主动对话消息
这条消息应该能够引起用户的兴趣重新开始对话
最近的对话历史可能已经过去了很久
{chat_history_text}
请直接输出一条消息不要有任何额外的解释或引导文字消息要简短自然就像是在日常聊天中的开场白
消息内容尽量简短,不要超过20个字,不要添加任何表情符号
"""
# 尝试生成回复,添加超时处理
try:
content, _ = await asyncio.wait_for(
self.llm.generate_response_async(prompt),
timeout=30 # 30秒超时
)
except asyncio.TimeoutError:
logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时")
return
except Exception as llm_err:
logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}")
return
# 清理结果
content = content.strip()
content = content.strip('"\'')
if not content:
logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空")
return
# 统一错误处理从这里开始所有操作都在同一个try-except块中
logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送")
from .pfc_manager import PFCManager
from src.plugins.chat.chat_stream import chat_manager
# 获取当前实例
pfc_manager = PFCManager.get_instance()
# 结束当前对话实例(如果存在)
current_conversation = await pfc_manager.get_conversation(self.stream_id)
if current_conversation:
logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例")
try:
await current_conversation.stop()
await pfc_manager.remove_conversation(self.stream_id)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例")
# 创建新的对话实例
logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息")
new_conversation = None
try:
new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}")
return
# 确保新对话实例已初始化完成
chat_stream = await self._get_chat_stream(new_conversation)
if not chat_stream:
logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息")
return
# 发送消息
try:
await self.message_sender.send_message(
chat_stream=chat_stream,
content=content,
reply_to_message=None
)
# 更新空闲会话启动器的最后消息时间
await self.update_last_message_time()
# 如果新对话实例有一个聊天观察者,请触发更新
if new_conversation and hasattr(new_conversation, 'chat_observer'):
logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新")
try:
new_conversation.chat_observer.trigger_update()
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}")
logger.success(f"[私聊][{self.private_name}]成功主动发起对话: {content}")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}")
except Exception as e:
# 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃
logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}")
async def _get_chat_stream(self, conversation: Optional['Conversation'] = None) -> Optional[ChatStream]:
"""获取可用的聊天流
尝试多种方式获取聊天流
1. 从传入的对话实例中获取
2. 从全局聊天管理器中获取
3. 创建一个新的聊天流
Args:
conversation: 对话实例可以为None
Returns:
Optional[ChatStream]: 如果成功获取则返回聊天流否则返回None
"""
chat_stream = None
# 1. 尝试从对话实例获取
if conversation and hasattr(conversation, 'should_continue'):
# 等待一小段时间,确保初始化完成
retry_count = 0
max_retries = 10
while not conversation.should_continue and retry_count < max_retries:
await asyncio.sleep(0.5)
retry_count += 1
logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}")
if not conversation.should_continue:
logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流")
# 尝试使用对话实例的聊天流
if hasattr(conversation, 'chat_stream') and conversation.chat_stream:
logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流")
return conversation.chat_stream
# 2. 尝试从聊天管理器获取
from src.plugins.chat.chat_stream import chat_manager
try:
logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流")
chat_stream = chat_manager.get_stream(self.stream_id)
if chat_stream:
return chat_stream
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}")
# 3. 创建新的聊天流
try:
logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流")
# 创建用户信息对象
user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform="qq"
)
# 创建聊天流
return ChatStream(self.stream_id, "qq", user_info)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}")
return None

View File

@ -33,6 +33,7 @@ class PFCManager:
Args:
stream_id: 聊天流ID
private_name: 私聊名称
Returns:
Optional[Conversation]: 对话实例创建失败则返回None
@ -61,7 +62,12 @@ class PFCManager:
if instance.should_continue:
logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}")
return instance
# else: 实例存在但不应继续
else:
# 清理旧实例资源
await self._cleanup_conversation(instance)
del self._instances[stream_id]
# 创建新实例
try:
# 创建新实例
logger.info(f"[私聊][{private_name}]创建新的对话实例: {stream_id}")
@ -102,6 +108,25 @@ class PFCManager:
logger.error(f"[私聊][{private_name}]{traceback.format_exc()}")
# 清理失败的初始化
async def _cleanup_conversation(self, conversation: Conversation):
"""清理会话实例的资源
Args:
conversation: 要清理的会话实例
"""
try:
# 调用conversation的停止方法确保所有组件都被正确关闭
if hasattr(conversation, 'stop') and callable(conversation.stop):
await conversation.stop()
# 特别确保空闲对话检测器被关闭
if hasattr(conversation, 'idle_conversation_starter'):
conversation.idle_conversation_starter.stop()
logger.info(f"[私聊][{conversation.private_name}]会话实例 {conversation.stream_id} 资源已清理")
except Exception as e:
logger.error(f"[私聊][{conversation.private_name}]清理会话实例资源失败: {e}")
async def get_conversation(self, stream_id: str) -> Optional[Conversation]:
"""获取已存在的会话实例
@ -112,3 +137,19 @@ class PFCManager:
Optional[Conversation]: 会话实例不存在则返回None
"""
return self._instances.get(stream_id)
async def remove_conversation(self, stream_id: str):
"""移除会话实例
Args:
stream_id: 聊天流ID
"""
if stream_id in self._instances:
try:
# 清理资源
await self._cleanup_conversation(self._instances[stream_id])
# 删除实例引用
del self._instances[stream_id]
logger.info(f"会话实例 {stream_id} 已从管理器中移除")
except Exception as e:
logger.error(f"移除会话实例 {stream_id} 失败: {e}")

View File

@ -290,6 +290,12 @@ provider = "SILICONFLOW"
pri_in = 2
pri_out = 8
[idle_conversation]
enable_idle_conversation = true
idle_check_interval = 10 # 检查间隔10分钟检查一次
min_idle_time = 7200 # 最短无活动时间2小时 (7200秒)
max_idle_time = 18000 # 最长无活动时间5小时 (18000秒)
#以下模型暂时没有使用!!
#以下模型暂时没有使用!!