diff --git a/src/plugins/PFC/PFC_idle/__init__.py b/src/plugins/PFC/PFC_idle/__init__.py index 77d90210..07680619 100644 --- a/src/plugins/PFC/PFC_idle/__init__.py +++ b/src/plugins/PFC/PFC_idle/__init__.py @@ -3,18 +3,10 @@ PFC_idle 包 - 用于空闲时主动聊天的功能模块 该包包含以下主要组件: - IdleChat: 根据关系和活跃度进行智能主动聊天 -- IdleChatManager: 管理多个聊天实例的空闲状态 -- IdleConversation: 处理与空闲聊天相关的功能,与主Conversation类解耦 """ from .idle_chat import IdleChat -from .idle_chat_manager import IdleChatManager -from .idle_conversation import IdleConversation, get_idle_conversation_instance, initialize_idle_conversation __all__ = [ - "IdleChat", - "IdleChatManager", - "IdleConversation", - "get_idle_conversation_instance", - "initialize_idle_conversation", + 'IdleChat' ] diff --git a/src/plugins/PFC/PFC_idle/idle_chat.py b/src/plugins/PFC/PFC_idle/idle_chat.py index 2962d52c..2c74660d 100644 --- a/src/plugins/PFC/PFC_idle/idle_chat.py +++ b/src/plugins/PFC/PFC_idle/idle_chat.py @@ -144,15 +144,12 @@ class IdleChat: self._task: Optional[asyncio.Task] = None # 配置参数 - 从global_config加载 - self.min_cooldown = getattr( - global_config, "MIN_IDLE_TIME", 7200 - ) # 最短冷却时间(默认2小时)建议修改长一点,你也不希望你的bot一直骚扰你吧 - self.max_cooldown = getattr(global_config, "MAX_IDLE_TIME", 14400) # 最长冷却时间(默认4小时) - self.min_idle_time = getattr(global_config, "MIN_IDLE_TIME", 3600) - self.check_interval = getattr(global_config, "IDLE_CHECK_INTERVAL", 600) # 检查间隔(默认10分钟) - self.active_hours_start = 6 # 活动开始时间 - self.active_hours_end = 24 # 活动结束时间 - + self.min_cooldown = getattr(global_config, "min_idle_time", 7200) # 最短冷却时间(默认2小时) + self.max_cooldown = getattr(global_config, "max_idle_time", 18000) # 最长冷却时间(默认5小时) + self.check_interval = getattr(global_config, "idle_check_interval", 10) * 60 # 检查间隔(默认10分钟,转换为秒) + self.active_hours_start = 7 # 活动开始时间 + self.active_hours_end = 23 # 活动结束时间 + # 关系值相关 self.base_trigger_probability = 0.3 # 基础触发概率 self.relationship_factor = 0.0003 # 关系值影响因子 diff --git a/src/plugins/PFC/PFC_idle/idle_chat_manager.py b/src/plugins/PFC/PFC_idle/idle_chat_manager.py deleted file mode 100644 index a8708331..00000000 --- a/src/plugins/PFC/PFC_idle/idle_chat_manager.py +++ /dev/null @@ -1,183 +0,0 @@ -from typing import Dict, Optional -import asyncio -from src.common.logger_manager import get_logger -from .idle_chat import IdleChat -import traceback - -logger = get_logger("pfc_idle_chat_manager") - - -class IdleChatManager: - """空闲聊天管理器 - - 用于管理所有私聊用户的空闲聊天实例。 - 采用单例模式,确保全局只有一个管理器实例。 - """ - - _instance: Optional["IdleChatManager"] = None - _lock: asyncio.Lock = asyncio.Lock() - - def __init__(self): - """初始化空闲聊天管理器""" - self._idle_chats: Dict[str, IdleChat] = {} # stream_id -> IdleChat - self._active_conversations_count: Dict[str, int] = {} # stream_id -> count - - @classmethod - def get_instance(cls) -> "IdleChatManager": - """获取管理器单例 (同步版本) - - Returns: - IdleChatManager: 管理器实例 - """ - if not cls._instance: - # 在同步环境中创建实例 - cls._instance = cls() - return cls._instance - - @classmethod - async def get_instance_async(cls) -> "IdleChatManager": - """获取管理器单例 (异步版本) - - Returns: - IdleChatManager: 管理器实例 - """ - if not cls._instance: - async with cls._lock: - if not cls._instance: - cls._instance = cls() - return cls._instance - - async def get_or_create_idle_chat(self, stream_id: str, private_name: str) -> IdleChat: - """获取或创建空闲聊天实例 - - Args: - stream_id: 聊天流ID - private_name: 私聊用户名称 - - Returns: - IdleChat: 空闲聊天实例 - """ - if stream_id not in self._idle_chats: - idle_chat = IdleChat(stream_id, private_name) - self._idle_chats[stream_id] = idle_chat - # 初始化活跃对话计数 - if stream_id not in self._active_conversations_count: - self._active_conversations_count[stream_id] = 0 - idle_chat.start() # 启动空闲检测 - logger.info(f"[私聊][{private_name}]创建并启动新的空闲聊天实例") - return self._idle_chats[stream_id] - - async def remove_idle_chat(self, stream_id: str) -> None: - """移除空闲聊天实例 - - Args: - stream_id: 聊天流ID - """ - if stream_id in self._idle_chats: - idle_chat = self._idle_chats[stream_id] - idle_chat.stop() # 停止空闲检测 - del self._idle_chats[stream_id] - if stream_id in self._active_conversations_count: - del self._active_conversations_count[stream_id] - logger.info(f"[私聊][{idle_chat.private_name}]移除空闲聊天实例") - - async def notify_conversation_start(self, stream_id: str) -> None: - """通知对话开始 - - Args: - stream_id: 聊天流ID - """ - try: - if stream_id not in self._idle_chats: - logger.warning(f"对话开始通知: {stream_id} 没有对应的IdleChat实例,将创建一个") - # 从stream_id尝试提取private_name - private_name = stream_id - if stream_id.startswith("private_"): - parts = stream_id.split("_") - if len(parts) >= 2: - private_name = parts[1] # 取第二部分作为名称 - await self.get_or_create_idle_chat(stream_id, private_name) - - if stream_id not in self._active_conversations_count: - self._active_conversations_count[stream_id] = 0 - - # 增加计数前记录当前值,用于日志 - old_count = self._active_conversations_count[stream_id] - self._active_conversations_count[stream_id] += 1 - new_count = self._active_conversations_count[stream_id] - - # 确保IdleChat实例存在 - idle_chat = self._idle_chats.get(stream_id) - if idle_chat: - await idle_chat.increment_active_instances() - logger.debug(f"对话开始通知: {stream_id}, 计数从{old_count}增加到{new_count}") - else: - logger.error(f"对话开始通知: {stream_id}, 计数增加但IdleChat不存在! 计数:{old_count}->{new_count}") - except Exception as e: - logger.error(f"对话开始通知处理失败: {stream_id}, 错误: {e}") - logger.error(traceback.format_exc()) - - async def notify_conversation_end(self, stream_id: str) -> None: - """通知对话结束 - - Args: - stream_id: 聊天流ID - """ - try: - # 记录当前计数用于日志 - old_count = self._active_conversations_count.get(stream_id, 0) - - # 安全减少计数,避免负数 - if stream_id in self._active_conversations_count and self._active_conversations_count[stream_id] > 0: - self._active_conversations_count[stream_id] -= 1 - else: - # 如果计数已经为0或不存在,设置为0 - self._active_conversations_count[stream_id] = 0 - - new_count = self._active_conversations_count.get(stream_id, 0) - - # 确保IdleChat实例存在 - idle_chat = self._idle_chats.get(stream_id) - if idle_chat: - await idle_chat.decrement_active_instances() - logger.debug(f"对话结束通知: {stream_id}, 计数从{old_count}减少到{new_count}") - else: - logger.warning(f"对话结束通知: {stream_id}, 计数减少但IdleChat不存在! 计数:{old_count}->{new_count}") - - # 检查是否所有对话都结束了,帮助调试 - all_counts = sum(self._active_conversations_count.values()) - if all_counts == 0: - logger.info("所有对话实例都已结束,当前总活跃计数为0") - except Exception as e: - logger.error(f"对话结束通知处理失败: {stream_id}, 错误: {e}") - logger.error(traceback.format_exc()) - - def get_idle_chat(self, stream_id: str) -> Optional[IdleChat]: - """获取空闲聊天实例 - - Args: - stream_id: 聊天流ID - - Returns: - Optional[IdleChat]: 空闲聊天实例,如果不存在则返回None - """ - return self._idle_chats.get(stream_id) - - def get_active_conversations_count(self, stream_id: str) -> int: - """获取指定流的活跃对话计数 - - Args: - stream_id: 聊天流ID - - Returns: - int: 活跃对话计数 - """ - return self._active_conversations_count.get(stream_id, 0) - - def get_all_active_conversations_count(self) -> int: - """获取所有活跃对话总计数 - - Returns: - int: 活跃对话总计数 - """ - return sum(self._active_conversations_count.values()) diff --git a/src/plugins/PFC/PFC_idle/idle_conversation.py b/src/plugins/PFC/PFC_idle/idle_conversation.py deleted file mode 100644 index 90036d13..00000000 --- a/src/plugins/PFC/PFC_idle/idle_conversation.py +++ /dev/null @@ -1,513 +0,0 @@ -import traceback -import asyncio -from typing import Optional, Dict -from src.common.logger_manager import get_logger -import time - -logger = get_logger("pfc_idle_conversation") - - -class IdleConversation: - """ - 处理Idle聊天相关的功能,将这些功能从主Conversation类中分离出来, - 以减少代码量并方便维护。 - """ - - def __init__(self): - """初始化IdleConversation实例""" - self._idle_chat_manager = None - self._running = False - self._active_streams: Dict[str, bool] = {} # 跟踪活跃的流 - self._monitor_task = None # 用于后台监控的任务 - self._lock = asyncio.Lock() # 用于线程安全操作 - self._initialization_in_progress = False # 防止并发初始化 - - async def initialize(self): - """初始化Idle聊天管理器""" - # 防止并发初始化 - if self._initialization_in_progress: - logger.debug("IdleConversation正在初始化中,等待完成") - return False - - if self._idle_chat_manager is not None: - logger.debug("IdleConversation已初始化,无需重复操作") - return True - - # 标记开始初始化 - self._initialization_in_progress = True - - try: - # 从PFCManager获取IdleChatManager实例 - from ..pfc_manager import PFCManager - - pfc_manager = PFCManager.get_instance() - self._idle_chat_manager = pfc_manager.get_idle_chat_manager() - logger.debug("IdleConversation初始化完成,已获取IdleChatManager实例") - return True - except Exception as e: - logger.error(f"初始化IdleConversation时出错: {e}") - logger.error(traceback.format_exc()) - return False - finally: - # 无论成功或失败,都清除初始化标志 - self._initialization_in_progress = False - - async def start(self): - """启动IdleConversation,创建后台监控任务""" - if self._running: - logger.debug("IdleConversation已经在运行") - return False - - if not self._idle_chat_manager: - success = await self.initialize() - if not success: - logger.error("无法启动IdleConversation:初始化失败") - return False - - try: - self._running = True - # 创建后台监控任务,使用try-except块来捕获可能的异常 - try: - loop = asyncio.get_running_loop() - if loop.is_running(): - self._monitor_task = asyncio.create_task(self._monitor_loop()) - logger.info("IdleConversation启动成功,后台监控任务已创建") - else: - logger.warning("事件循环不活跃,跳过监控任务创建") - except RuntimeError: - # 如果没有活跃的事件循环,记录警告但继续执行 - logger.warning("没有活跃的事件循环,IdleConversation将不会启动监控任务") - # 尽管没有监控任务,但仍然将running设为True表示IdleConversation已启动 - - return True - except Exception as e: - self._running = False - logger.error(f"启动IdleConversation失败: {e}") - logger.error(traceback.format_exc()) - return False - - async def stop(self): - """停止IdleConversation的后台任务""" - if not self._running: - return - - self._running = False - if self._monitor_task and not self._monitor_task.done(): - try: - self._monitor_task.cancel() - try: - await asyncio.wait_for(self._monitor_task, timeout=2.0) - except asyncio.TimeoutError: - logger.warning("停止IdleConversation监控任务超时") - except asyncio.CancelledError: - pass # 正常取消 - except Exception as e: - logger.error(f"停止IdleConversation监控任务时出错: {e}") - logger.error(traceback.format_exc()) - - self._monitor_task = None - logger.info("IdleConversation已停止") - - async def _monitor_loop(self): - """后台监控循环,定期检查活跃的会话并执行必要的操作""" - try: - while self._running: - try: - # 同步活跃流计数到IdleChatManager - if self._idle_chat_manager: - await self._sync_active_streams_to_manager() - - # 这里可以添加定期检查逻辑,如查询空闲状态等 - active_count = len(self._active_streams) - logger.debug(f"IdleConversation监控中,当前活跃流数量: {active_count}") - - except Exception as e: - logger.error(f"IdleConversation监控循环出错: {e}") - logger.error(traceback.format_exc()) - - # 每30秒执行一次监控 - await asyncio.sleep(30) - except asyncio.CancelledError: - logger.info("IdleConversation监控任务已取消") - except Exception as e: - logger.error(f"IdleConversation监控任务异常退出: {e}") - logger.error(traceback.format_exc()) - self._running = False - - async def _sync_active_streams_to_manager(self): - """同步活跃流计数到IdleChatManager和IdleChat""" - try: - if not self._idle_chat_manager: - return - - # 获取当前的活跃流列表 - async with self._lock: - active_streams = list(self._active_streams.keys()) - - # 对每个活跃流,确保IdleChatManager和IdleChat中的计数是正确的 - for stream_id in active_streams: - # 获取当前IdleChatManager中的计数 - manager_count = self._idle_chat_manager.get_active_conversations_count(stream_id) - - # 由于我们的活跃流字典只记录是否活跃(值为True),所以计数应该是1 - if manager_count != 1: - # 修正IdleChatManager中的计数 - old_count = manager_count - self._idle_chat_manager._active_conversations_count[stream_id] = 1 - logger.warning(f"同步调整IdleChatManager中的计数: stream_id={stream_id}, {old_count}->1") - - # 同时修正IdleChat中的计数 - idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) - if idle_chat: - if getattr(idle_chat, "active_instances_count", 0) != 1: - old_count = getattr(idle_chat, "active_instances_count", 0) - idle_chat.active_instances_count = 1 - logger.warning(f"同步调整IdleChat中的计数: stream_id={stream_id}, {old_count}->1") - - # 检查IdleChatManager中有没有多余的计数(conversation中已不存在但manager中还有) - for stream_id, count in list(self._idle_chat_manager._active_conversations_count.items()): - if count > 0 and stream_id not in active_streams: - # 重置为0 - self._idle_chat_manager._active_conversations_count[stream_id] = 0 - logger.warning(f"重置IdleChatManager中的多余计数: stream_id={stream_id}, {count}->0") - - # 同时修正IdleChat中的计数 - idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) - if idle_chat and getattr(idle_chat, "active_instances_count", 0) > 0: - old_count = getattr(idle_chat, "active_instances_count", 0) - idle_chat.active_instances_count = 0 - logger.warning(f"同步重置IdleChat中的计数: stream_id={stream_id}, {old_count}->0") - - # 日志记录同步结果 - total_active = len(active_streams) - total_manager = sum(self._idle_chat_manager._active_conversations_count.values()) - logger.debug(f"同步后的计数: IdleConversation活跃流={total_active}, IdleChatManager总计数={total_manager}") - - except Exception as e: - logger.error(f"同步活跃流计数失败: {e}") - logger.error(traceback.format_exc()) - - async def get_or_create_idle_chat(self, stream_id: str, private_name: str): - """ - 获取或创建IdleChat实例 - - Args: - stream_id: 聊天流ID - private_name: 私聊对象名称,用于日志 - - Returns: - bool: 操作是否成功 - """ - # 确保IdleConversation已启动 - if not self._running: - await self.start() - - if not self._idle_chat_manager: - # 如果尚未初始化,尝试初始化 - success = await self.initialize() - if not success: - logger.warning(f"[私聊][{private_name}] 获取或创建IdleChat失败:IdleChatManager未初始化") - return False - - try: - # 创建IdleChat实例 - _idle_chat = await self._idle_chat_manager.get_or_create_idle_chat(stream_id, private_name) - logger.debug(f"[私聊][{private_name}] 已创建或获取IdleChat实例") - return True - except Exception as e: - logger.warning(f"[私聊][{private_name}] 创建或获取IdleChat实例失败: {e}") - logger.warning(traceback.format_exc()) - return False - - async def notify_conversation_start(self, stream_id: str, private_name: str) -> bool: - """ - 通知空闲聊天管理器对话开始 - - Args: - stream_id: 聊天流ID - private_name: 私聊对象名称,用于日志 - - Returns: - bool: 通知是否成功 - """ - try: - # 确保IdleConversation已启动 - if not self._running: - success = await self.start() - if not success: - logger.warning(f"[私聊][{private_name}] 启动IdleConversation失败,无法通知对话开始") - return False - - if not self._idle_chat_manager: - # 如果尚未初始化,尝试初始化 - success = await self.initialize() - if not success: - logger.warning(f"[私聊][{private_name}] 通知对话开始失败:IdleChatManager未初始化") - return False - - try: - # 确保IdleChat实例已创建 - 这是关键步骤,要先创建IdleChat - await self.get_or_create_idle_chat(stream_id, private_name) - - # 先记录活跃状态 - 这是权威源 - async with self._lock: - self._active_streams[stream_id] = True - - # 然后同步到IdleChatManager - if self._idle_chat_manager: - await self._idle_chat_manager.notify_conversation_start(stream_id) - logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话开始") - else: - logger.warning(f"[私聊][{private_name}] IdleChatManager不存在,但已记录活跃状态") - - # 立即进行一次同步,确保数据一致性 - await self._sync_active_streams_to_manager() - - return True - except Exception as e: - logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话开始失败: {e}") - logger.warning(traceback.format_exc()) - # 即使通知失败,也应记录活跃状态 - async with self._lock: - self._active_streams[stream_id] = True - return False - except Exception as outer_e: - logger.error(f"[私聊][{private_name}] 处理对话开始通知时发生严重错误: {outer_e}") - logger.error(traceback.format_exc()) - return False - - async def notify_conversation_end(self, stream_id: str, private_name: str) -> bool: - """ - 通知空闲聊天管理器对话结束 - - Args: - stream_id: 聊天流ID - private_name: 私聊对象名称,用于日志 - - Returns: - bool: 通知是否成功 - """ - try: - # 先从自身的活跃流中移除 - 这是权威源 - was_active = False - async with self._lock: - if stream_id in self._active_streams: - del self._active_streams[stream_id] - was_active = True - logger.debug(f"[私聊][{private_name}] 已从活跃流中移除 {stream_id}") - - if not self._idle_chat_manager: - # 如果尚未初始化,尝试初始化 - success = await self.initialize() - if not success: - logger.warning(f"[私聊][{private_name}] 通知对话结束失败:IdleChatManager未初始化") - return False - - try: - # 然后同步到IdleChatManager - if self._idle_chat_manager: - # 无论如何都尝试通知 - await self._idle_chat_manager.notify_conversation_end(stream_id) - - # 立即进行一次同步,确保数据一致性 - await self._sync_active_streams_to_manager() - - logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话结束") - - # 检查当前活跃流数量 - active_count = len(self._active_streams) - if active_count == 0: - logger.info(f"[私聊][{private_name}] 当前无活跃流,可能会触发主动聊天") - - # 额外调用:如果实例存在且只有在确实移除了活跃流的情况下才触发检查 - if was_active: - idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) - if idle_chat: - # 直接触发IdleChat检查,而不是等待下一个循环 - logger.info(f"[私聊][{private_name}] 对话结束,手动触发一次主动聊天检查") - asyncio.create_task(self._trigger_idle_chat_check(idle_chat, stream_id, private_name)) - - return True - else: - logger.warning(f"[私聊][{private_name}] IdleChatManager不存在,但已更新活跃状态") - return False - except Exception as e: - logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话结束失败: {e}") - logger.warning(traceback.format_exc()) - return False - except Exception as outer_e: - logger.error(f"[私聊][{private_name}] 处理对话结束通知时发生严重错误: {outer_e}") - logger.error(traceback.format_exc()) - return False - - async def _trigger_idle_chat_check(self, idle_chat, stream_id: str, private_name: str): - """在对话结束后,手动触发一次IdleChat的检查""" - try: - # 确保活跃计数与IdleConversation一致 - async with self._lock: - is_active_in_conversation = stream_id in self._active_streams - - # 强制使IdleChat的计数与IdleConversation一致 - if is_active_in_conversation: - # 如果在IdleConversation中是活跃的,IdleChat的计数应该是1 - if idle_chat.active_instances_count != 1: - old_count = idle_chat.active_instances_count - idle_chat.active_instances_count = 1 - logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->1") - else: - # 如果在IdleConversation中不是活跃的,IdleChat的计数应该是0 - if idle_chat.active_instances_count != 0: - old_count = idle_chat.active_instances_count - idle_chat.active_instances_count = 0 - logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->0") - - # 等待1秒,让任何正在进行的处理完成 - await asyncio.sleep(1) - - # 只有当stream不再活跃时才触发检查 - if not is_active_in_conversation: - # 尝试触发一次检查 - if hasattr(idle_chat, "_should_trigger"): - should_trigger = await idle_chat._should_trigger() - logger.info(f"[私聊][{private_name}] 手动触发主动聊天检查结果: {should_trigger}") - - # 如果应该触发,直接调用_initiate_chat - if should_trigger and hasattr(idle_chat, "_initiate_chat"): - logger.info(f"[私聊][{private_name}] 手动触发主动聊天") - await idle_chat._initiate_chat() - # 更新最后触发时间 - idle_chat.last_trigger_time = time.time() - else: - logger.warning(f"[私聊][{private_name}] IdleChat没有_should_trigger方法,无法触发检查") - except Exception as e: - logger.error(f"[私聊][{private_name}] 手动触发主动聊天检查时出错: {e}") - logger.error(traceback.format_exc()) - - def is_stream_active(self, stream_id: str) -> bool: - """检查指定的stream是否活跃""" - return stream_id in self._active_streams - - def get_active_streams_count(self) -> int: - """获取当前活跃的stream数量""" - return len(self._active_streams) - - @property - def is_running(self) -> bool: - """检查IdleConversation是否正在运行""" - return self._running - - @property - def idle_chat_manager(self): - """获取IdleChatManager实例""" - return self._idle_chat_manager - - -# 创建单例实例 -_instance: Optional[IdleConversation] = None -_instance_lock = asyncio.Lock() -_initialization_in_progress = False # 防止并发初始化 - - -async def initialize_idle_conversation() -> IdleConversation: - """初始化并启动IdleConversation单例实例""" - global _initialization_in_progress - - # 防止并发初始化 - if _initialization_in_progress: - logger.debug("IdleConversation全局初始化正在进行中,等待完成") - return get_idle_conversation_instance() - - # 标记正在初始化 - _initialization_in_progress = True - - try: - instance = get_idle_conversation_instance() - - # 如果实例已经在运行,避免重复初始化 - if getattr(instance, "_running", False): - logger.debug("IdleConversation已在运行状态,无需重新初始化") - _initialization_in_progress = False - return instance - - # 初始化实例 - success = await instance.initialize() - if not success: - logger.error("IdleConversation初始化失败") - _initialization_in_progress = False - return instance - - # 启动实例 - success = await instance.start() - if not success: - logger.error("IdleConversation启动失败") - else: - # 启动成功,进行初始检查 - logger.info("IdleConversation启动成功,执行初始化后检查") - # 这里可以添加一些启动后的检查,如果需要 - - # 创建一个异步任务,定期检查系统状态 - asyncio.create_task(periodic_system_check(instance)) - - return instance - except Exception as e: - logger.error(f"初始化并启动IdleConversation时出错: {e}") - logger.error(traceback.format_exc()) - # 重置标志,允许下次再试 - _initialization_in_progress = False - return get_idle_conversation_instance() # 返回实例,即使初始化失败 - finally: - # 清除初始化标志 - _initialization_in_progress = False - - -async def periodic_system_check(instance: IdleConversation): - """定期检查系统状态,确保主动聊天功能正常工作""" - try: - # 等待10秒,让系统完全启动 - await asyncio.sleep(10) - - while getattr(instance, "_running", False): - try: - # 检查活跃流数量 - active_streams_count = len(getattr(instance, "_active_streams", {})) - - # 如果IdleChatManager存在,检查其中的活跃对话计数 - idle_chat_manager = getattr(instance, "_idle_chat_manager", None) - if idle_chat_manager and hasattr(idle_chat_manager, "get_all_active_conversations_count"): - manager_count = idle_chat_manager.get_all_active_conversations_count() - - # 如果两者不一致,记录警告 - if active_streams_count != manager_count: - logger.warning( - f"检测到计数不一致: IdleConversation记录的活跃流数量({active_streams_count}) 与 IdleChatManager记录的活跃对话数({manager_count})不匹配" - ) - - # 如果IdleChatManager记录的计数为0但自己的记录不为0,进行修正 - if manager_count == 0 and active_streams_count > 0: - logger.warning("检测到可能的计数错误,尝试修正:清空IdleConversation的活跃流记录") - async with instance._lock: - instance._active_streams.clear() - - # 检查计数如果为0,帮助日志输出 - if active_streams_count == 0: - logger.debug("当前没有活跃的对话流,应该可以触发主动聊天") - - except Exception as check_err: - logger.error(f"执行系统检查时出错: {check_err}") - logger.error(traceback.format_exc()) - - # 每60秒检查一次 - await asyncio.sleep(60) - except asyncio.CancelledError: - logger.debug("系统检查任务被取消") - except Exception as e: - logger.error(f"系统检查任务异常退出: {e}") - logger.error(traceback.format_exc()) - - -def get_idle_conversation_instance() -> IdleConversation: - """获取IdleConversation的单例实例""" - global _instance - if _instance is None: - _instance = IdleConversation() - return _instance diff --git a/src/plugins/PFC/PFC_idle/idle_conversation_starter.py b/src/plugins/PFC/PFC_idle/idle_conversation_starter.py deleted file mode 100644 index 83a58219..00000000 --- a/src/plugins/PFC/PFC_idle/idle_conversation_starter.py +++ /dev/null @@ -1,354 +0,0 @@ -import time -import asyncio -import random -import traceback -from typing import TYPE_CHECKING, Optional - -from src.common.logger_manager import get_logger -from src.plugins.models.utils_model import LLMRequest -from src.config.config import global_config -from src.plugins.chat.chat_stream import chat_manager, ChatStream -from src.individuality.individuality import Individuality -from src.plugins.utils.chat_message_builder import build_readable_messages -from maim_message import UserInfo - -from ..chat_observer import ChatObserver -from ..message_sender import DirectMessageSender - -# 导入富文本回溯,用于更好的错误展示 -from rich.traceback import install - -# 使用TYPE_CHECKING避免循环导入 -if TYPE_CHECKING: - from ..conversation import Conversation - -install(extra_lines=3) - -# 获取当前模块的日志记录器 -logger = get_logger("idle_conversation_starter") - - -class IdleConversationStarter: - """长时间无对话主动发起对话的组件 - - 该组件会在一段时间没有对话后,自动生成一条消息发送给用户,以保持对话的活跃度。 - 时间阈值会在配置的最小和最大值之间随机选择,每次发送消息后都会重置。 - """ - - def __init__(self, stream_id: str, private_name: str): - """初始化空闲对话启动器 - - Args: - stream_id: 聊天流ID - private_name: 私聊用户名称 - """ - self.stream_id: str = stream_id - self.private_name: str = private_name - self.chat_observer = ChatObserver.get_instance(stream_id, private_name) - self.message_sender = DirectMessageSender(private_name) - - # 添加异步锁,保护对共享变量的访问 - self._lock: asyncio.Lock = asyncio.Lock() - - # LLM请求对象,用于生成主动对话内容 - self.llm = LLMRequest( - model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter" - ) - - # 个性化信息 - self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3) - - # 计算实际触发阈值(在min和max之间随机) - self.actual_idle_threshold: int = random.randint(global_config.min_idle_time, global_config.max_idle_time) - - # 工作状态 - self.last_message_time: float = time.time() - self._running: bool = False - self._task: Optional[asyncio.Task] = None - - def start(self) -> None: - """启动空闲对话检测 - - 如果功能被禁用或已经在运行,则不会启动。 - """ - # 如果功能被禁用,则不启动 - if not global_config.enable_idle_conversation: - logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用") - return - - if self._running: - logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中") - return - - self._running = True - self._task = asyncio.create_task(self._check_idle_loop()) - logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}秒") - - def stop(self) -> None: - """停止空闲对话检测 - - 取消当前运行的任务并重置状态。 - """ - if not self._running: - return - - self._running = False - if self._task: - self._task.cancel() - self._task = None - logger.info(f"[私聊][{self.private_name}]停止空闲对话检测") - - async def update_last_message_time(self, message_time: Optional[float] = None) -> None: - """更新最后一条消息的时间 - - Args: - message_time: 消息时间戳,如果为None则使用当前时间 - """ - async with self._lock: - self.last_message_time = message_time or time.time() - # 重新随机化下一次触发的时间阈值 - self.actual_idle_threshold = random.randint(global_config.min_idle_time, global_config.max_idle_time) - logger.debug( - f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}秒" - ) - - def reload_config(self) -> None: - """重新加载配置 - - 记录当前配置参数,用于日志输出 - """ - try: - logger.debug( - f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={global_config.enable_idle_conversation}, 检查间隔={global_config.idle_check_interval}秒, 最短间隔={global_config.min_idle_time}秒, 最长间隔={global_config.max_idle_time}秒" - ) - - # 重新计算实际阈值 - async def update_threshold(): - async with self._lock: - self.actual_idle_threshold = random.randint( - global_config.min_idle_time, global_config.max_idle_time - ) - logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}秒") - - # 创建一个任务来异步更新阈值 - asyncio.create_task(update_threshold()) - - except Exception as e: - logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}") - logger.error(traceback.format_exc()) - - async def _check_idle_loop(self) -> None: - """检查空闲状态的循环 - - 定期检查是否长时间无对话,如果达到阈值则尝试主动发起对话。 - """ - try: - config_reload_counter = 0 - config_reload_interval = 100 # 每100次检查重新加载一次配置 - - while self._running: - # 定期重新加载配置 - config_reload_counter += 1 - if config_reload_counter >= config_reload_interval: - self.reload_config() - config_reload_counter = 0 - - # 检查是否启用了主动对话功能 - if not global_config.enable_idle_conversation: - # 如果禁用了功能,就等待一段时间后再次检查配置 - await asyncio.sleep(global_config.idle_check_interval) - continue - - # 使用锁保护对共享变量的读取 - current_time = time.time() - async with self._lock: - idle_time = current_time - self.last_message_time - threshold = self.actual_idle_threshold - - if idle_time >= threshold: - logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天") - await self._initiate_conversation() - # 更新时间,避免连续触发 - await self.update_last_message_time() - - # 等待下一次检查 - await asyncio.sleep(global_config.idle_check_interval) - - except asyncio.CancelledError: - logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消") - except Exception as e: - logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}") - logger.error(traceback.format_exc()) - # 尝试重新启动检测循环 - if self._running: - logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测") - self._task = asyncio.create_task(self._check_idle_loop()) - - async def _initiate_conversation(self) -> None: - """生成并发送主动对话内容 - - 获取聊天历史记录,使用LLM生成合适的开场白(大概),然后发送消息。 - """ - try: - # 获取聊天历史记录,用于生成更合适的开场白 - messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息 - chat_history_text = await build_readable_messages( - messages, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, - ) - - # 构建提示词 - prompt = f"""{self.personality_info}。你的名字是{global_config.BOT_NICKNAME}。 - 你正在与用户{self.private_name}进行QQ私聊, - 但已经有一段时间没有对话了。 - 你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。 - 请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。 - 这条消息应该能够引起用户的兴趣,重新开始对话。 - 最近的对话历史(可能已经过去了很久): - {chat_history_text} - 请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。 - 消息内容尽量简短,不要超过20个字,不要添加任何表情符号。 - """ - - # 尝试生成回复,添加超时处理 - try: - content, _ = await asyncio.wait_for( - self.llm.generate_response_async(prompt), - timeout=30, # 30秒超时 - ) - except asyncio.TimeoutError: - logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时") - return - except Exception as llm_err: - logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}") - logger.error(traceback.format_exc()) - return - - # 清理结果 - content = content.strip() - content = content.strip("\"'") - - if not content: - logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空") - return - - # 统一错误处理,从这里开始所有操作都在同一个try-except块中 - logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送") - - # 在函数内部导入PFCManager,避免循环导入 - from ..pfc_manager import PFCManager - - # 获取当前实例 - 注意这是同步方法,不需要await - pfc_manager = PFCManager.get_instance() - - # 结束当前对话实例(如果存在) - current_conversation = await pfc_manager.get_conversation(self.stream_id) - if current_conversation: - logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例") - try: - await current_conversation.stop() - await pfc_manager.remove_conversation(self.stream_id) - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例") - logger.warning(traceback.format_exc()) - - # 创建新的对话实例 - logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息") - new_conversation = None - try: - new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}") - logger.error(traceback.format_exc()) - return - - # 确保新对话实例已初始化完成 - chat_stream = await self._get_chat_stream(new_conversation) - if not chat_stream: - logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息") - return - - # 发送消息 - try: - await self.message_sender.send_message(chat_stream=chat_stream, content=content, reply_to_message=None) - - # 更新空闲会话启动器的最后消息时间 - await self.update_last_message_time() - - # 如果新对话实例有一个聊天观察者,请触发更新 - if new_conversation and hasattr(new_conversation, "chat_observer"): - logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新") - try: - new_conversation.chat_observer.trigger_update() - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}") - logger.warning(traceback.format_exc()) - - logger.info(f"[私聊][{self.private_name}]成功主动发起对话: {content}") - except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}") - logger.error(traceback.format_exc()) - - except Exception as e: - # 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃 - logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}") - logger.error(traceback.format_exc()) - - async def _get_chat_stream(self, conversation: Optional["Conversation"] = None) -> Optional[ChatStream]: - """获取可用的聊天流 - - 尝试多种方式获取聊天流: - 1. 从传入的对话实例中获取 - 2. 从全局聊天管理器中获取 - 3. 创建一个新的聊天流 - - Args: - conversation: 对话实例,可以为None - - Returns: - Optional[ChatStream]: 如果成功获取则返回聊天流,否则返回None - """ - chat_stream = None - - # 1. 尝试从对话实例获取 - if conversation and hasattr(conversation, "should_continue"): - # 等待一小段时间,确保初始化完成 - retry_count = 0 - max_retries = 10 - while not conversation.should_continue and retry_count < max_retries: - await asyncio.sleep(0.5) - retry_count += 1 - logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}") - - if not conversation.should_continue: - logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流") - - # 尝试使用对话实例的聊天流 - if hasattr(conversation, "chat_stream") and conversation.chat_stream: - logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流") - return conversation.chat_stream - - # 2. 尝试从聊天管理器获取 - try: - logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流") - chat_stream = chat_manager.get_stream(self.stream_id) - if chat_stream: - return chat_stream - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}") - logger.warning(traceback.format_exc()) - - # 3. 创建新的聊天流 - try: - logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流") - # 创建用户信息对象 - user_info = UserInfo(user_id=global_config.BOT_QQ, user_nickname=global_config.BOT_NICKNAME, platform="qq") - # 创建聊天流 - return ChatStream(self.stream_id, "qq", user_info) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}") - logger.error(traceback.format_exc()) - return None diff --git a/src/plugins/PFC/actions.py b/src/plugins/PFC/actions.py index 8d8a9e0f..4d9986ae 100644 --- a/src/plugins/PFC/actions.py +++ b/src/plugins/PFC/actions.py @@ -361,7 +361,7 @@ async def handle_action( observation_info.chat_history_str = "[构建聊天记录出错]" # --- 新增结束 --- - # 更新 idle_conversation_starter 的最后消息时间 + # 更新 idle_chat 的最后消息时间 # (避免在发送消息后很快触发主动聊天) if conversation_instance.idle_chat: await conversation_instance.idle_chat.update_last_message_time(send_end_time) @@ -506,7 +506,7 @@ async def handle_action( action_successful = True # 标记成功 # final_status 和 final_reason 会在 finally 中设置 logger.info(f"[私聊][{conversation_instance.private_name}] 成功发送告别语,即将停止对话实例。") - # 更新 idle_conversation_starter 的最后消息时间 + # 更新 idle_chat 的最后消息时间 # (避免在发送消息后很快触发主动聊天) if conversation_instance.idle_chat: await conversation_instance.idle_chat.update_last_message_time(send_end_time)