diff --git a/.gitignore b/.gitignore index 9e1b9681..67565521 100644 --- a/.gitignore +++ b/.gitignore @@ -300,3 +300,5 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk +__pycache__/ +*.pyc diff --git a/src/plugins/PFC/PFC_idle/__init__.py b/src/plugins/PFC/PFC_idle/__init__.py new file mode 100644 index 00000000..a06c1834 --- /dev/null +++ b/src/plugins/PFC/PFC_idle/__init__.py @@ -0,0 +1,20 @@ +""" +PFC_idle 包 - 用于空闲时主动聊天的功能模块 + +该包包含以下主要组件: +- IdleChat: 根据关系和活跃度进行智能主动聊天 +- IdleChatManager: 管理多个聊天实例的空闲状态 +- IdleConversation: 处理与空闲聊天相关的功能,与主Conversation类解耦 +""" + +from .idle_chat import IdleChat +from .idle_chat_manager import IdleChatManager +from .idle_conversation import IdleConversation, get_idle_conversation_instance, initialize_idle_conversation + +__all__ = [ + 'IdleChat', + 'IdleChatManager', + 'IdleConversation', + 'get_idle_conversation_instance', + 'initialize_idle_conversation' +] \ No newline at end of file diff --git a/src/plugins/PFC/PFC_idle/idle_chat.py b/src/plugins/PFC/PFC_idle/idle_chat.py new file mode 100644 index 00000000..38c9e772 --- /dev/null +++ b/src/plugins/PFC/PFC_idle/idle_chat.py @@ -0,0 +1,550 @@ +from typing import Optional, Dict, List, Set +import asyncio +import time +import random +import traceback +from datetime import datetime +from src.common.logger_manager import get_logger +from src.config.config import global_config +from src.plugins.models.utils_model import LLMRequest +from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager +from src.plugins.person_info.person_info import person_info_manager +from src.plugins.utils.chat_message_builder import build_readable_messages +from ...schedule.schedule_generator import bot_schedule +from ....config.config import global_config +from ..chat_observer import ChatObserver +from ..message_sender import DirectMessageSender +from src.plugins.chat.chat_stream import ChatStream +from maim_message import UserInfo +from ..pfc_relationship import PfcRepationshipTranslator +from rich.traceback import install + +install(extra_lines=3) + +logger = get_logger("pfc_idle_chat") + +class IdleChat: + """主动聊天组件(测试中) + + 在以下条件都满足时触发主动聊天: + 1. 当前没有任何活跃的对话实例 + 2. 在指定的活动时间内(7:00-23:00) + 3. 根据关系值动态调整触发概率 + 4. 上次触发后已经过了足够的冷却时间 + """ + + # 单例模式实现 + _instances: Dict[str, 'IdleChat'] = {} + + # 全局共享状态,用于跟踪未回复的用户 + _pending_replies: Dict[str, float] = {} # 用户名 -> 发送时间 + _tried_users: Set[str] = set() # 已尝试过的用户集合 + _global_lock = asyncio.Lock() # 保护共享状态的全局锁 + + @classmethod + def get_instance(cls, stream_id: str, private_name: str) -> 'IdleChat': + """获取IdleChat实例(单例模式) + + Args: + stream_id: 聊天流ID + private_name: 私聊用户名称 + + Returns: + IdleChat: IdleChat实例 + """ + key = f"{private_name}:{stream_id}" + if key not in cls._instances: + cls._instances[key] = cls(stream_id, private_name) + # 创建实例时自动启动检测 + cls._instances[key].start() + logger.info(f"[私聊][{private_name}]创建新的IdleChat实例并启动") + return cls._instances[key] + + @classmethod + async def register_user_response(cls, private_name: str) -> None: + """注册用户已回复 + + 当用户回复消息时调用此方法,将用户从待回复列表中移除 + + Args: + private_name: 私聊用户名称 + """ + async with cls._global_lock: + if private_name in cls._pending_replies: + del cls._pending_replies[private_name] + logger.info(f"[私聊][{private_name}]已回复主动聊天消息,从待回复列表中移除") + + @classmethod + async def get_next_available_user(cls) -> Optional[str]: + """获取下一个可用于主动聊天的用户 + + 优先选择未尝试过的用户,其次是已尝试但超时未回复的用户 + + Returns: + Optional[str]: 下一个可用的用户名,如果没有则返回None + """ + async with cls._global_lock: + current_time = time.time() + timeout_threshold = 7200 # 2小时未回复视为超时 + + # 清理超时未回复的用户 + for user, send_time in list(cls._pending_replies.items()): + if current_time - send_time > timeout_threshold: + logger.info(f"[私聊][{user}]超过{timeout_threshold}秒未回复,标记为超时") + del cls._pending_replies[user] + + # 获取所有实例中的用户 + all_users = set() + for key in cls._instances: + user = key.split(':', 1)[0] + all_users.add(user) + + # 优先选择未尝试过的用户 + untried_users = all_users - cls._tried_users + if untried_users: + next_user = random.choice(list(untried_users)) + cls._tried_users.add(next_user) + return next_user + + # 如果所有用户都已尝试过,重置尝试集合,从头开始 + if len(cls._tried_users) >= len(all_users): + cls._tried_users.clear() + logger.info(f"[私聊]所有用户都已尝试过,重置尝试列表") + # 随机选择一个不在待回复列表中的用户 + available_users = all_users - set(cls._pending_replies.keys()) + if available_users: + next_user = random.choice(list(available_users)) + cls._tried_users.add(next_user) + return next_user + + return None + + def __init__(self, stream_id: str, private_name: str): + """初始化主动聊天组件 + + Args: + stream_id: 聊天流ID + private_name: 私聊用户名称 + """ + self.stream_id = stream_id + self.private_name = private_name + self.chat_observer = ChatObserver.get_instance(stream_id, private_name) + self.message_sender = DirectMessageSender(private_name) + + # 添加异步锁,保护对共享变量的访问 + self._lock: asyncio.Lock = asyncio.Lock() + + # LLM请求对象,用于生成主动对话内容 + self.llm = LLMRequest( + model=global_config.llm_normal, + temperature=0.5, + max_tokens=500, + request_type="idle_chat" + ) + + # 工作状态 + self.active_instances_count: int = 0 + self.last_trigger_time: float = time.time() - 1500 # 初始化时减少等待时间 + self._running: bool = False + self._task: Optional[asyncio.Task] = None + + # 配置参数 - 从global_config加载 + self.min_cooldown = getattr(global_config,"MIN_IDLE_TIME", 7200) # 最短冷却时间(默认2小时)建议修改长一点,你也不希望你的bot一直骚扰你吧 + self.max_cooldown = getattr(global_config, "MAX_IDLE_TIME", 14400) # 最长冷却时间(默认4小时) + self.min_idle_time = getattr(global_config, "MIN_IDLE_TIME", 3600) + self.check_interval = getattr(global_config, "IDLE_CHECK_INTERVAL", 600) # 检查间隔(默认10分钟) + self.active_hours_start = 6 # 活动开始时间 + self.active_hours_end = 24 # 活动结束时间 + + # 关系值相关 + self.base_trigger_probability = 0.3 # 基础触发概率 + self.relationship_factor = 0.0003 # 关系值影响因子 + + def start(self) -> None: + """启动主动聊天检测""" + # 检查是否启用了主动聊天功能 + if not getattr(global_config, "ENABLE_IDLE_CONVERSATION", False): + logger.info(f"[私聊][{self.private_name}]主动聊天功能已禁用(配置ENABLE_IDLE_CONVERSATION=False)") + return + + if self._running: + logger.debug(f"[私聊][{self.private_name}]主动聊天功能已在运行中") + return + + self._running = True + self._task = asyncio.create_task(self._check_idle_loop()) + logger.info(f"[私聊][{self.private_name}]启动主动聊天检测") + + def stop(self) -> None: + """停止主动聊天检测 + """ + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + self._task = None + logger.info(f"[私聊][{self.private_name}]停止主动聊天检测") + + async def increment_active_instances(self) -> None: + """增加活跃实例计数 + + 当创建新的对话实例时调用此方法 + """ + async with self._lock: + self.active_instances_count += 1 + logger.debug(f"[私聊][{self.private_name}]活跃实例数+1,当前:{self.active_instances_count}") + + async def decrement_active_instances(self) -> None: + """减少活跃实例计数 + + 当对话实例结束时调用此方法 + """ + async with self._lock: + self.active_instances_count = max(0, self.active_instances_count - 1) + logger.debug(f"[私聊][{self.private_name}]活跃实例数-1,当前:{self.active_instances_count}") + + async def update_last_message_time(self, message_time: Optional[float] = None) -> None: + """更新最后一条消息的时间 + + Args: + message_time: 消息时间戳,如果为None则使用当前时间 + """ + async with self._lock: + self.last_trigger_time = message_time or time.time() + logger.debug(f"[私聊][{self.private_name}]更新最后消息时间: {self.last_trigger_time:.2f}") + + # 当用户发送消息时,也应该注册响应 + await self.__class__.register_user_response(self.private_name) + + def _is_active_hours(self) -> bool: + """检查是否在活动时间内""" + current_hour = datetime.now().hour + return self.active_hours_start <= current_hour < self.active_hours_end + + async def _should_trigger(self) -> bool: + """检查是否应该触发主动聊天""" + async with self._lock: + # 确保计数不会出错,重置为0如果发现是负数 + if self.active_instances_count < 0: + logger.warning(f"[私聊][{self.private_name}]检测到活跃实例数为负数,重置为0") + self.active_instances_count = 0 + + # 检查是否有活跃实例 + if self.active_instances_count > 0: + logger.debug(f"[私聊][{self.private_name}]存在活跃实例({self.active_instances_count}),不触发主动聊天") + return False + + # 检查是否在活动时间内 + if not self._is_active_hours(): + logger.debug(f"[私聊][{self.private_name}]不在活动时间内,不触发主动聊天") + return False + + # 检查冷却时间 + current_time = time.time() + time_since_last_trigger = current_time - self.last_trigger_time + if time_since_last_trigger < self.min_cooldown: + time_left = self.min_cooldown - time_since_last_trigger + logger.debug(f"[私聊][{self.private_name}]冷却时间未到(已过{time_since_last_trigger:.0f}秒/需要{self.min_cooldown}秒),还需等待{time_left:.0f}秒,不触发主动聊天") + return False + + # 强制触发检查 - 如果超过最大冷却时间,增加触发概率 + force_trigger = False + if time_since_last_trigger > self.max_cooldown * 2: # 如果超过最大冷却时间的两倍 + force_probability = min(0.6, self.base_trigger_probability * 2) # 增加概率但不超过0.6 + random_force = random.random() + force_trigger = random_force < force_probability + if force_trigger: + logger.info(f"[私聊][{self.private_name}]超过最大冷却时间({time_since_last_trigger:.0f}秒),强制触发主动聊天") + return True + + # 获取关系值 + relationship_value = 0 + try: + # 导入relationship_manager以使用ensure_float方法 + from src.plugins.person_info.relationship_manager import relationship_manager + + # 尝试获取person_id + person_id = None + try: + # 先尝试通过昵称获取person_id + platform = "qq" # 默认平台 + person_id = person_info_manager.get_person_id(platform, self.private_name) + + # 如果通过昵称获取失败,尝试通过stream_id解析 + if not person_id: + parts = self.stream_id.split('_') + if len(parts) >= 2 and parts[0] == "private": + user_id = parts[1] + platform = parts[2] if len(parts) >= 3 else "qq" + try: + person_id = person_info_manager.get_person_id(platform, int(user_id)) + except ValueError: + # 如果user_id不是整数,尝试作为字符串使用 + person_id = person_info_manager.get_person_id(platform, user_id) + except Exception as e2: + logger.warning(f"[私聊][{self.private_name}]尝试获取person_id失败: {str(e2)}") + + # 获取关系值 + if person_id: + raw_value = await person_info_manager.get_value(person_id, "relationship_value") + relationship_value = relationship_manager.ensure_float(raw_value, person_id) + logger.debug(f"[私聊][{self.private_name}]成功获取关系值: {relationship_value}") + else: + logger.warning(f"[私聊][{self.private_name}]无法获取person_id,使用默认关系值0") + + # 使用PfcRepationshipTranslator获取关系描述 + relationship_translator = PfcRepationshipTranslator(self.private_name) + relationship_level = relationship_translator._calculate_relationship_level_num(relationship_value, self.private_name) + + # 基于关系等级调整触发概率 + # 关系越好,主动聊天概率越高 + level_probability_factors = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5] # 每个等级对应的基础概率因子 + base_probability = level_probability_factors[relationship_level] + + # 基础概率因子 + trigger_probability = base_probability + trigger_probability = max(0.05, min(0.6, trigger_probability)) # 限制在0.05-0.6之间 + + # 最大冷却时间调整 - 随着冷却时间增加,逐渐增加触发概率 + if time_since_last_trigger > self.max_cooldown: + # 计算额外概率 - 每超过最大冷却时间的10%,增加1%的概率,最多增加30% + extra_time_factor = min(0.3, (time_since_last_trigger - self.max_cooldown) / (self.max_cooldown * 10)) + trigger_probability += extra_time_factor + logger.debug(f"[私聊][{self.private_name}]超过标准冷却时间,额外增加概率: +{extra_time_factor:.2f}") + + # 随机判断是否触发 + random_value = random.random() + should_trigger = random_value < trigger_probability + logger.debug(f"[私聊][{self.private_name}]触发概率计算: 基础({base_probability:.2f}) + 关系值({relationship_value})影响 = {trigger_probability:.2f},随机值={random_value:.2f}, 结果={should_trigger}") + + # 如果决定触发,记录详细日志 + if should_trigger: + logger.info(f"[私聊][{self.private_name}]决定触发主动聊天: 触发概率={trigger_probability:.2f}, 距上次已过{time_since_last_trigger:.0f}秒") + + return should_trigger + + except Exception as e: + logger.error(f"[私聊][{self.private_name}]获取关系值失败: {str(e)}") + logger.error(traceback.format_exc()) + + # 即使获取关系值失败,仍有一个基础的几率触发 + # 这确保即使数据库有问题,主动聊天功能仍然可用 + base_fallback_probability = 0.1 # 较低的基础几率 + random_fallback = random.random() + fallback_trigger = random_fallback < base_fallback_probability + if fallback_trigger: + logger.info(f"[私聊][{self.private_name}]获取关系值失败,使用后备触发机制: 概率={base_fallback_probability:.2f}, 决定={fallback_trigger}") + return fallback_trigger + + async def _check_idle_loop(self) -> None: + """检查空闲状态的循环""" + try: + while self._running: + # 检查是否启用了主动聊天功能 + if not getattr(global_config, "ENABLE_IDLE_CONVERSATION", False): + # 如果禁用了功能,等待一段时间后再次检查配置 + await asyncio.sleep(60) # 每分钟检查一次配置变更 + continue + + # 检查当前用户是否应该触发主动聊天 + should_trigger = await self._should_trigger() + + # 如果当前用户不触发,检查是否有其他用户已经超时未回复 + if not should_trigger: + async with self.__class__._global_lock: + current_time = time.time() + pending_timeout = 1800 # 30分钟未回复检查 + + # 检查此用户是否在等待回复列表中 + if self.private_name in self.__class__._pending_replies: + logger.debug(f"[私聊][{self.private_name}]当前用户在等待回复列表中,不进行额外检查") + else: + # 查找所有超过30分钟未回复的用户 + timed_out_users = [] + for user, send_time in self.__class__._pending_replies.items(): + if current_time - send_time > pending_timeout: + timed_out_users.append(user) + + # 如果有超时未回复的用户,尝试找下一个用户 + if timed_out_users: + logger.info(f"[私聊]发现{len(timed_out_users)}个用户超过{pending_timeout}秒未回复") + next_user = await self.__class__.get_next_available_user() + + if next_user and next_user != self.private_name: + logger.info(f"[私聊]选择下一个用户[{next_user}]进行主动聊天") + # 查找该用户的实例并触发聊天 + for key, instance in self.__class__._instances.items(): + if key.startswith(f"{next_user}:"): + logger.info(f"[私聊]为用户[{next_user}]触发主动聊天") + # 触发该实例的主动聊天 + asyncio.create_task(instance._initiate_chat()) + break + + # 如果当前用户应该触发主动聊天 + if should_trigger: + try: + await self._initiate_chat() + # 更新上次触发时间 + async with self._lock: + self.last_trigger_time = time.time() + + # 将此用户添加到等待回复列表中 + async with self.__class__._global_lock: + self.__class__._pending_replies[self.private_name] = time.time() + self.__class__._tried_users.add(self.private_name) + logger.info(f"[私聊][{self.private_name}]已添加到等待回复列表中") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]执行主动聊天过程出错: {str(e)}") + logger.error(traceback.format_exc()) + + # 等待下一次检查 + check_interval = self.check_interval # 使用配置的检查间隔 + logger.debug(f"[私聊][{self.private_name}]等待{check_interval}秒后进行下一次主动聊天检查") + await asyncio.sleep(check_interval) + + except asyncio.CancelledError: + logger.debug(f"[私聊][{self.private_name}]主动聊天检测任务被取消") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]主动聊天检测出错: {str(e)}") + logger.error(traceback.format_exc()) + # 尝试重新启动检测循环 + if self._running: + logger.info(f"[私聊][{self.private_name}]尝试重新启动主动聊天检测") + self._task = asyncio.create_task(self._check_idle_loop()) + + async def _get_chat_stream(self) -> Optional[ChatStream]: + """获取聊天流实例""" + try: + # 尝试从全局聊天管理器获取现有的聊天流 + from src.plugins.chat.chat_stream import chat_manager + existing_chat_stream = chat_manager.get_stream(self.stream_id) + if existing_chat_stream: + logger.debug(f"[私聊][{self.private_name}]从chat_manager找到现有聊天流") + return existing_chat_stream + + # 如果没有现有聊天流,则创建新的 + logger.debug(f"[私聊][{self.private_name}]未找到现有聊天流,创建新聊天流") + # 创建用户信息对象 + user_info = UserInfo( + user_id=self.private_name, # 使用私聊用户的ID + user_nickname=self.private_name, # 使用私聊用户的名称 + platform="qq" + ) + # 创建聊天流 + new_stream = ChatStream(self.stream_id, "qq", user_info) + # 将新创建的聊天流添加到管理器中 + chat_manager.register_stream(new_stream) + logger.debug(f"[私聊][{self.private_name}]成功创建并注册新聊天流") + return new_stream + except Exception as e: + logger.error(f"[私聊][{self.private_name}]创建/获取聊天流失败: {str(e)}") + logger.error(traceback.format_exc()) + return None + + async def _initiate_chat(self) -> None: + """生成并发送主动聊天消息""" + try: + # 获取聊天历史记录 + messages = self.chat_observer.get_cached_messages(limit=12) + chat_history_text = await build_readable_messages( + messages, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0 + ) + + # 获取关系信息 + from src.plugins.person_info.relationship_manager import relationship_manager + + # 获取关系值 + relationship_value = 0 + try: + platform = "qq" + person_id = person_info_manager.get_person_id(platform, self.private_name) + if person_id: + raw_value = await person_info_manager.get_value(person_id, "relationship_value") + relationship_value = relationship_manager.ensure_float(raw_value, person_id) + except Exception as e: + logger.warning(f"[私聊][{self.private_name}]获取关系值失败,使用默认值: {e}") + + # 使用PfcRepationshipTranslator获取关系描述 + relationship_translator = PfcRepationshipTranslator(self.private_name) + full_relationship_text = await relationship_translator.translate_relationship_value_to_text(relationship_value) + + # 提取纯关系描述(去掉"你们的关系是:"前缀) + relationship_description = "普通" # 默认值 + if ":" in full_relationship_text: + relationship_description = full_relationship_text.split(":")[1].replace("。", "") + + if global_config.ENABLE_SCHEDULE_GEN: + schedule_prompt = await global_prompt_manager.format_prompt( + "schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False) + ) + else: + schedule_prompt = "" + + # 构建提示词 + current_time = datetime.now().strftime("%H:%M") + prompt = f"""你是{global_config.BOT_NICKNAME}。 + 你正在与用户{self.private_name}进行QQ私聊,你们的关系是{relationship_description} + 现在时间{current_time} + 这是你的日程{schedule_prompt} + 你想要主动发起对话。 + 请基于以下之前的对话历史,生成一条自然、友好、符合关系程度的主动对话消息。 + 这条消息应能够引起用户的兴趣,重新开始对话。 + 最近的对话历史(并不是现在的对话): + {chat_history_text} + 请你严格根据对话历史决定是告诉对方你正在做的事情,还是询问对方正在做的事情 + 请直接输出一条消息,不要有任何额外的解释或引导文字 + 消息内容尽量简短 + """ + + # 生成回复 + logger.debug(f"[私聊][{self.private_name}]开始生成主动聊天内容") + try: + content, _ = await asyncio.wait_for( + self.llm.generate_response_async(prompt), + timeout=30 + ) + logger.debug(f"[私聊][{self.private_name}]成功生成主动聊天内容: {content}") + except asyncio.TimeoutError: + logger.error(f"[私聊][{self.private_name}]生成主动聊天内容超时") + return + except Exception as llm_err: + logger.error(f"[私聊][{self.private_name}]生成主动聊天内容失败: {str(llm_err)}") + logger.error(traceback.format_exc()) + return + + # 清理结果 + content = content.strip() + content = content.strip("\"'") + + if not content: + logger.error(f"[私聊][{self.private_name}]生成的主动聊天内容为空") + return + + # 获取聊天流 + chat_stream = await self._get_chat_stream() + if not chat_stream: + logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息") + return + + # 发送消息 + try: + logger.debug(f"[私聊][{self.private_name}]准备发送主动聊天消息: {content}") + await self.message_sender.send_message( + chat_stream=chat_stream, + content=content, + reply_to_message=None + ) + logger.info(f"[私聊][{self.private_name}]成功主动发起聊天: {content}") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]发送主动聊天消息失败: {str(e)}") + logger.error(traceback.format_exc()) + + except Exception as e: + logger.error(f"[私聊][{self.private_name}]主动发起聊天过程中发生未预期的错误: {str(e)}") + logger.error(traceback.format_exc()) \ No newline at end of file diff --git a/src/plugins/PFC/PFC_idle/idle_chat_manager.py b/src/plugins/PFC/PFC_idle/idle_chat_manager.py new file mode 100644 index 00000000..66ec37b6 --- /dev/null +++ b/src/plugins/PFC/PFC_idle/idle_chat_manager.py @@ -0,0 +1,182 @@ +from typing import Dict, Optional +import asyncio +from src.common.logger_manager import get_logger +from .idle_chat import IdleChat +import traceback + +logger = get_logger("pfc_idle_chat_manager") + +class IdleChatManager: + """空闲聊天管理器 + + 用于管理所有私聊用户的空闲聊天实例。 + 采用单例模式,确保全局只有一个管理器实例。 + """ + + _instance: Optional["IdleChatManager"] = None + _lock: asyncio.Lock = asyncio.Lock() + + def __init__(self): + """初始化空闲聊天管理器""" + self._idle_chats: Dict[str, IdleChat] = {} # stream_id -> IdleChat + self._active_conversations_count: Dict[str, int] = {} # stream_id -> count + + @classmethod + def get_instance(cls) -> "IdleChatManager": + """获取管理器单例 (同步版本) + + Returns: + IdleChatManager: 管理器实例 + """ + if not cls._instance: + # 在同步环境中创建实例 + cls._instance = cls() + return cls._instance + + @classmethod + async def get_instance_async(cls) -> "IdleChatManager": + """获取管理器单例 (异步版本) + + Returns: + IdleChatManager: 管理器实例 + """ + if not cls._instance: + async with cls._lock: + if not cls._instance: + cls._instance = cls() + return cls._instance + + async def get_or_create_idle_chat(self, stream_id: str, private_name: str) -> IdleChat: + """获取或创建空闲聊天实例 + + Args: + stream_id: 聊天流ID + private_name: 私聊用户名称 + + Returns: + IdleChat: 空闲聊天实例 + """ + if stream_id not in self._idle_chats: + idle_chat = IdleChat(stream_id, private_name) + self._idle_chats[stream_id] = idle_chat + # 初始化活跃对话计数 + if stream_id not in self._active_conversations_count: + self._active_conversations_count[stream_id] = 0 + idle_chat.start() # 启动空闲检测 + logger.info(f"[私聊][{private_name}]创建并启动新的空闲聊天实例") + return self._idle_chats[stream_id] + + async def remove_idle_chat(self, stream_id: str) -> None: + """移除空闲聊天实例 + + Args: + stream_id: 聊天流ID + """ + if stream_id in self._idle_chats: + idle_chat = self._idle_chats[stream_id] + idle_chat.stop() # 停止空闲检测 + del self._idle_chats[stream_id] + if stream_id in self._active_conversations_count: + del self._active_conversations_count[stream_id] + logger.info(f"[私聊][{idle_chat.private_name}]移除空闲聊天实例") + + async def notify_conversation_start(self, stream_id: str) -> None: + """通知对话开始 + + Args: + stream_id: 聊天流ID + """ + try: + if stream_id not in self._idle_chats: + logger.warning(f"对话开始通知: {stream_id} 没有对应的IdleChat实例,将创建一个") + # 从stream_id尝试提取private_name + private_name = stream_id + if stream_id.startswith("private_"): + parts = stream_id.split("_") + if len(parts) >= 2: + private_name = parts[1] # 取第二部分作为名称 + await self.get_or_create_idle_chat(stream_id, private_name) + + if stream_id not in self._active_conversations_count: + self._active_conversations_count[stream_id] = 0 + + # 增加计数前记录当前值,用于日志 + old_count = self._active_conversations_count[stream_id] + self._active_conversations_count[stream_id] += 1 + new_count = self._active_conversations_count[stream_id] + + # 确保IdleChat实例存在 + idle_chat = self._idle_chats.get(stream_id) + if idle_chat: + await idle_chat.increment_active_instances() + logger.debug(f"对话开始通知: {stream_id}, 计数从{old_count}增加到{new_count}") + else: + logger.error(f"对话开始通知: {stream_id}, 计数增加但IdleChat不存在! 计数:{old_count}->{new_count}") + except Exception as e: + logger.error(f"对话开始通知处理失败: {stream_id}, 错误: {e}") + logger.error(traceback.format_exc()) + + async def notify_conversation_end(self, stream_id: str) -> None: + """通知对话结束 + + Args: + stream_id: 聊天流ID + """ + try: + # 记录当前计数用于日志 + old_count = self._active_conversations_count.get(stream_id, 0) + + # 安全减少计数,避免负数 + if stream_id in self._active_conversations_count and self._active_conversations_count[stream_id] > 0: + self._active_conversations_count[stream_id] -= 1 + else: + # 如果计数已经为0或不存在,设置为0 + self._active_conversations_count[stream_id] = 0 + + new_count = self._active_conversations_count.get(stream_id, 0) + + # 确保IdleChat实例存在 + idle_chat = self._idle_chats.get(stream_id) + if idle_chat: + await idle_chat.decrement_active_instances() + logger.debug(f"对话结束通知: {stream_id}, 计数从{old_count}减少到{new_count}") + else: + logger.warning(f"对话结束通知: {stream_id}, 计数减少但IdleChat不存在! 计数:{old_count}->{new_count}") + + # 检查是否所有对话都结束了,帮助调试 + all_counts = sum(self._active_conversations_count.values()) + if all_counts == 0: + logger.info(f"所有对话实例都已结束,当前总活跃计数为0") + except Exception as e: + logger.error(f"对话结束通知处理失败: {stream_id}, 错误: {e}") + logger.error(traceback.format_exc()) + + def get_idle_chat(self, stream_id: str) -> Optional[IdleChat]: + """获取空闲聊天实例 + + Args: + stream_id: 聊天流ID + + Returns: + Optional[IdleChat]: 空闲聊天实例,如果不存在则返回None + """ + return self._idle_chats.get(stream_id) + + def get_active_conversations_count(self, stream_id: str) -> int: + """获取指定流的活跃对话计数 + + Args: + stream_id: 聊天流ID + + Returns: + int: 活跃对话计数 + """ + return self._active_conversations_count.get(stream_id, 0) + + def get_all_active_conversations_count(self) -> int: + """获取所有活跃对话总计数 + + Returns: + int: 活跃对话总计数 + """ + return sum(self._active_conversations_count.values()) \ No newline at end of file diff --git a/src/plugins/PFC/PFC_idle/idle_conversation.py b/src/plugins/PFC/PFC_idle/idle_conversation.py new file mode 100644 index 00000000..3aae1853 --- /dev/null +++ b/src/plugins/PFC/PFC_idle/idle_conversation.py @@ -0,0 +1,506 @@ +import traceback +import logging +import asyncio +from typing import Optional, Dict +from src.common.logger_manager import get_logger +import time + +logger = get_logger("pfc_idle_conversation") + +class IdleConversation: + """ + 处理Idle聊天相关的功能,将这些功能从主Conversation类中分离出来, + 以减少代码量并方便维护。 + """ + + def __init__(self): + """初始化IdleConversation实例""" + self._idle_chat_manager = None + self._running = False + self._active_streams: Dict[str, bool] = {} # 跟踪活跃的流 + self._monitor_task = None # 用于后台监控的任务 + self._lock = asyncio.Lock() # 用于线程安全操作 + self._initialization_in_progress = False # 防止并发初始化 + + async def initialize(self): + """初始化Idle聊天管理器""" + # 防止并发初始化 + if self._initialization_in_progress: + logger.debug("IdleConversation正在初始化中,等待完成") + return False + + if self._idle_chat_manager is not None: + logger.debug("IdleConversation已初始化,无需重复操作") + return True + + # 标记开始初始化 + self._initialization_in_progress = True + + try: + # 从PFCManager获取IdleChatManager实例 + from ..pfc_manager import PFCManager + pfc_manager = PFCManager.get_instance() + self._idle_chat_manager = pfc_manager.get_idle_chat_manager() + logger.debug("IdleConversation初始化完成,已获取IdleChatManager实例") + return True + except Exception as e: + logger.error(f"初始化IdleConversation时出错: {e}") + logger.error(traceback.format_exc()) + return False + finally: + # 无论成功或失败,都清除初始化标志 + self._initialization_in_progress = False + + async def start(self): + """启动IdleConversation,创建后台监控任务""" + if self._running: + logger.debug("IdleConversation已经在运行") + return False + + if not self._idle_chat_manager: + success = await self.initialize() + if not success: + logger.error("无法启动IdleConversation:初始化失败") + return False + + try: + self._running = True + # 创建后台监控任务,使用try-except块来捕获可能的异常 + try: + loop = asyncio.get_running_loop() + if loop.is_running(): + self._monitor_task = asyncio.create_task(self._monitor_loop()) + logger.info("IdleConversation启动成功,后台监控任务已创建") + else: + logger.warning("事件循环不活跃,跳过监控任务创建") + except RuntimeError: + # 如果没有活跃的事件循环,记录警告但继续执行 + logger.warning("没有活跃的事件循环,IdleConversation将不会启动监控任务") + # 尽管没有监控任务,但仍然将running设为True表示IdleConversation已启动 + + return True + except Exception as e: + self._running = False + logger.error(f"启动IdleConversation失败: {e}") + logger.error(traceback.format_exc()) + return False + + async def stop(self): + """停止IdleConversation的后台任务""" + if not self._running: + return + + self._running = False + if self._monitor_task and not self._monitor_task.done(): + try: + self._monitor_task.cancel() + try: + await asyncio.wait_for(self._monitor_task, timeout=2.0) + except asyncio.TimeoutError: + logger.warning("停止IdleConversation监控任务超时") + except asyncio.CancelledError: + pass # 正常取消 + except Exception as e: + logger.error(f"停止IdleConversation监控任务时出错: {e}") + logger.error(traceback.format_exc()) + + self._monitor_task = None + logger.info("IdleConversation已停止") + + async def _monitor_loop(self): + """后台监控循环,定期检查活跃的会话并执行必要的操作""" + try: + while self._running: + try: + # 同步活跃流计数到IdleChatManager + if self._idle_chat_manager: + await self._sync_active_streams_to_manager() + + # 这里可以添加定期检查逻辑,如查询空闲状态等 + active_count = len(self._active_streams) + logger.debug(f"IdleConversation监控中,当前活跃流数量: {active_count}") + + except Exception as e: + logger.error(f"IdleConversation监控循环出错: {e}") + logger.error(traceback.format_exc()) + + # 每30秒执行一次监控 + await asyncio.sleep(30) + except asyncio.CancelledError: + logger.info("IdleConversation监控任务已取消") + except Exception as e: + logger.error(f"IdleConversation监控任务异常退出: {e}") + logger.error(traceback.format_exc()) + self._running = False + + async def _sync_active_streams_to_manager(self): + """同步活跃流计数到IdleChatManager和IdleChat""" + try: + if not self._idle_chat_manager: + return + + # 获取当前的活跃流列表 + async with self._lock: + active_streams = list(self._active_streams.keys()) + + # 对每个活跃流,确保IdleChatManager和IdleChat中的计数是正确的 + for stream_id in active_streams: + # 获取当前IdleChatManager中的计数 + manager_count = self._idle_chat_manager.get_active_conversations_count(stream_id) + + # 由于我们的活跃流字典只记录是否活跃(值为True),所以计数应该是1 + if manager_count != 1: + # 修正IdleChatManager中的计数 + old_count = manager_count + self._idle_chat_manager._active_conversations_count[stream_id] = 1 + logger.warning(f"同步调整IdleChatManager中的计数: stream_id={stream_id}, {old_count}->1") + + # 同时修正IdleChat中的计数 + idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) + if idle_chat: + if getattr(idle_chat, "active_instances_count", 0) != 1: + old_count = getattr(idle_chat, "active_instances_count", 0) + idle_chat.active_instances_count = 1 + logger.warning(f"同步调整IdleChat中的计数: stream_id={stream_id}, {old_count}->1") + + # 检查IdleChatManager中有没有多余的计数(conversation中已不存在但manager中还有) + for stream_id, count in list(self._idle_chat_manager._active_conversations_count.items()): + if count > 0 and stream_id not in active_streams: + # 重置为0 + self._idle_chat_manager._active_conversations_count[stream_id] = 0 + logger.warning(f"重置IdleChatManager中的多余计数: stream_id={stream_id}, {count}->0") + + # 同时修正IdleChat中的计数 + idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) + if idle_chat and getattr(idle_chat, "active_instances_count", 0) > 0: + old_count = getattr(idle_chat, "active_instances_count", 0) + idle_chat.active_instances_count = 0 + logger.warning(f"同步重置IdleChat中的计数: stream_id={stream_id}, {old_count}->0") + + # 日志记录同步结果 + total_active = len(active_streams) + total_manager = sum(self._idle_chat_manager._active_conversations_count.values()) + logger.debug(f"同步后的计数: IdleConversation活跃流={total_active}, IdleChatManager总计数={total_manager}") + + except Exception as e: + logger.error(f"同步活跃流计数失败: {e}") + logger.error(traceback.format_exc()) + + async def get_or_create_idle_chat(self, stream_id: str, private_name: str): + """ + 获取或创建IdleChat实例 + + Args: + stream_id: 聊天流ID + private_name: 私聊对象名称,用于日志 + + Returns: + bool: 操作是否成功 + """ + # 确保IdleConversation已启动 + if not self._running: + await self.start() + + if not self._idle_chat_manager: + # 如果尚未初始化,尝试初始化 + success = await self.initialize() + if not success: + logger.warning(f"[私聊][{private_name}] 获取或创建IdleChat失败:IdleChatManager未初始化") + return False + + try: + # 创建IdleChat实例 + idle_chat = await self._idle_chat_manager.get_or_create_idle_chat(stream_id, private_name) + logger.debug(f"[私聊][{private_name}] 已创建或获取IdleChat实例") + return True + except Exception as e: + logger.warning(f"[私聊][{private_name}] 创建或获取IdleChat实例失败: {e}") + logger.warning(traceback.format_exc()) + return False + + async def notify_conversation_start(self, stream_id: str, private_name: str) -> bool: + """ + 通知空闲聊天管理器对话开始 + + Args: + stream_id: 聊天流ID + private_name: 私聊对象名称,用于日志 + + Returns: + bool: 通知是否成功 + """ + try: + # 确保IdleConversation已启动 + if not self._running: + success = await self.start() + if not success: + logger.warning(f"[私聊][{private_name}] 启动IdleConversation失败,无法通知对话开始") + return False + + if not self._idle_chat_manager: + # 如果尚未初始化,尝试初始化 + success = await self.initialize() + if not success: + logger.warning(f"[私聊][{private_name}] 通知对话开始失败:IdleChatManager未初始化") + return False + + try: + # 确保IdleChat实例已创建 - 这是关键步骤,要先创建IdleChat + await self.get_or_create_idle_chat(stream_id, private_name) + + # 先记录活跃状态 - 这是权威源 + async with self._lock: + self._active_streams[stream_id] = True + + # 然后同步到IdleChatManager + if self._idle_chat_manager: + await self._idle_chat_manager.notify_conversation_start(stream_id) + logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话开始") + else: + logger.warning(f"[私聊][{private_name}] IdleChatManager不存在,但已记录活跃状态") + + # 立即进行一次同步,确保数据一致性 + await self._sync_active_streams_to_manager() + + return True + except Exception as e: + logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话开始失败: {e}") + logger.warning(traceback.format_exc()) + # 即使通知失败,也应记录活跃状态 + async with self._lock: + self._active_streams[stream_id] = True + return False + except Exception as outer_e: + logger.error(f"[私聊][{private_name}] 处理对话开始通知时发生严重错误: {outer_e}") + logger.error(traceback.format_exc()) + return False + + async def notify_conversation_end(self, stream_id: str, private_name: str) -> bool: + """ + 通知空闲聊天管理器对话结束 + + Args: + stream_id: 聊天流ID + private_name: 私聊对象名称,用于日志 + + Returns: + bool: 通知是否成功 + """ + try: + # 先从自身的活跃流中移除 - 这是权威源 + was_active = False + async with self._lock: + if stream_id in self._active_streams: + del self._active_streams[stream_id] + was_active = True + logger.debug(f"[私聊][{private_name}] 已从活跃流中移除 {stream_id}") + + if not self._idle_chat_manager: + # 如果尚未初始化,尝试初始化 + success = await self.initialize() + if not success: + logger.warning(f"[私聊][{private_name}] 通知对话结束失败:IdleChatManager未初始化") + return False + + try: + # 然后同步到IdleChatManager + if self._idle_chat_manager: + # 无论如何都尝试通知 + await self._idle_chat_manager.notify_conversation_end(stream_id) + + # 立即进行一次同步,确保数据一致性 + await self._sync_active_streams_to_manager() + + logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话结束") + + # 检查当前活跃流数量 + active_count = len(self._active_streams) + if active_count == 0: + logger.info(f"[私聊][{private_name}] 当前无活跃流,可能会触发主动聊天") + + # 额外调用:如果实例存在且只有在确实移除了活跃流的情况下才触发检查 + if was_active: + idle_chat = self._idle_chat_manager.get_idle_chat(stream_id) + if idle_chat: + # 直接触发IdleChat检查,而不是等待下一个循环 + logger.info(f"[私聊][{private_name}] 对话结束,手动触发一次主动聊天检查") + asyncio.create_task(self._trigger_idle_chat_check(idle_chat, stream_id, private_name)) + + return True + else: + logger.warning(f"[私聊][{private_name}] IdleChatManager不存在,但已更新活跃状态") + return False + except Exception as e: + logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话结束失败: {e}") + logger.warning(traceback.format_exc()) + return False + except Exception as outer_e: + logger.error(f"[私聊][{private_name}] 处理对话结束通知时发生严重错误: {outer_e}") + logger.error(traceback.format_exc()) + return False + + async def _trigger_idle_chat_check(self, idle_chat, stream_id: str, private_name: str): + """在对话结束后,手动触发一次IdleChat的检查""" + try: + # 确保活跃计数与IdleConversation一致 + async with self._lock: + is_active_in_conversation = stream_id in self._active_streams + + # 强制使IdleChat的计数与IdleConversation一致 + if is_active_in_conversation: + # 如果在IdleConversation中是活跃的,IdleChat的计数应该是1 + if idle_chat.active_instances_count != 1: + old_count = idle_chat.active_instances_count + idle_chat.active_instances_count = 1 + logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->1") + else: + # 如果在IdleConversation中不是活跃的,IdleChat的计数应该是0 + if idle_chat.active_instances_count != 0: + old_count = idle_chat.active_instances_count + idle_chat.active_instances_count = 0 + logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->0") + + # 等待1秒,让任何正在进行的处理完成 + await asyncio.sleep(1) + + # 只有当stream不再活跃时才触发检查 + if not is_active_in_conversation: + # 尝试触发一次检查 + if hasattr(idle_chat, "_should_trigger"): + should_trigger = await idle_chat._should_trigger() + logger.info(f"[私聊][{private_name}] 手动触发主动聊天检查结果: {should_trigger}") + + # 如果应该触发,直接调用_initiate_chat + if should_trigger and hasattr(idle_chat, "_initiate_chat"): + logger.info(f"[私聊][{private_name}] 手动触发主动聊天") + await idle_chat._initiate_chat() + # 更新最后触发时间 + idle_chat.last_trigger_time = time.time() + else: + logger.warning(f"[私聊][{private_name}] IdleChat没有_should_trigger方法,无法触发检查") + except Exception as e: + logger.error(f"[私聊][{private_name}] 手动触发主动聊天检查时出错: {e}") + logger.error(traceback.format_exc()) + + def is_stream_active(self, stream_id: str) -> bool: + """检查指定的stream是否活跃""" + return stream_id in self._active_streams + + def get_active_streams_count(self) -> int: + """获取当前活跃的stream数量""" + return len(self._active_streams) + + @property + def is_running(self) -> bool: + """检查IdleConversation是否正在运行""" + return self._running + + @property + def idle_chat_manager(self): + """获取IdleChatManager实例""" + return self._idle_chat_manager + +# 创建单例实例 +_instance: Optional[IdleConversation] = None +_instance_lock = asyncio.Lock() +_initialization_in_progress = False # 防止并发初始化 + +async def initialize_idle_conversation() -> IdleConversation: + """初始化并启动IdleConversation单例实例""" + global _initialization_in_progress + + # 防止并发初始化 + if _initialization_in_progress: + logger.debug("IdleConversation全局初始化正在进行中,等待完成") + return get_idle_conversation_instance() + + # 标记正在初始化 + _initialization_in_progress = True + + try: + instance = get_idle_conversation_instance() + + # 如果实例已经在运行,避免重复初始化 + if getattr(instance, '_running', False): + logger.debug("IdleConversation已在运行状态,无需重新初始化") + _initialization_in_progress = False + return instance + + # 初始化实例 + success = await instance.initialize() + if not success: + logger.error("IdleConversation初始化失败") + _initialization_in_progress = False + return instance + + # 启动实例 + success = await instance.start() + if not success: + logger.error("IdleConversation启动失败") + else: + # 启动成功,进行初始检查 + logger.info("IdleConversation启动成功,执行初始化后检查") + # 这里可以添加一些启动后的检查,如果需要 + + # 创建一个异步任务,定期检查系统状态 + asyncio.create_task(periodic_system_check(instance)) + + return instance + except Exception as e: + logger.error(f"初始化并启动IdleConversation时出错: {e}") + logger.error(traceback.format_exc()) + # 重置标志,允许下次再试 + _initialization_in_progress = False + return get_idle_conversation_instance() # 返回实例,即使初始化失败 + finally: + # 清除初始化标志 + _initialization_in_progress = False + +async def periodic_system_check(instance: IdleConversation): + """定期检查系统状态,确保主动聊天功能正常工作""" + try: + # 等待10秒,让系统完全启动 + await asyncio.sleep(10) + + while getattr(instance, '_running', False): + try: + # 检查活跃流数量 + active_streams_count = len(getattr(instance, '_active_streams', {})) + + # 如果IdleChatManager存在,检查其中的活跃对话计数 + idle_chat_manager = getattr(instance, '_idle_chat_manager', None) + if idle_chat_manager and hasattr(idle_chat_manager, 'get_all_active_conversations_count'): + manager_count = idle_chat_manager.get_all_active_conversations_count() + + # 如果两者不一致,记录警告 + if active_streams_count != manager_count: + logger.warning(f"检测到计数不一致: IdleConversation记录的活跃流数量({active_streams_count}) 与 IdleChatManager记录的活跃对话数({manager_count})不匹配") + + # 如果IdleChatManager记录的计数为0但自己的记录不为0,进行修正 + if manager_count == 0 and active_streams_count > 0: + logger.warning(f"检测到可能的计数错误,尝试修正:清空IdleConversation的活跃流记录") + async with instance._lock: + instance._active_streams.clear() + + # 检查计数如果为0,帮助日志输出 + if active_streams_count == 0: + logger.debug("当前没有活跃的对话流,应该可以触发主动聊天") + + except Exception as check_err: + logger.error(f"执行系统检查时出错: {check_err}") + logger.error(traceback.format_exc()) + + # 每60秒检查一次 + await asyncio.sleep(60) + except asyncio.CancelledError: + logger.debug("系统检查任务被取消") + except Exception as e: + logger.error(f"系统检查任务异常退出: {e}") + logger.error(traceback.format_exc()) + +def get_idle_conversation_instance() -> IdleConversation: + """获取IdleConversation的单例实例""" + global _instance + if _instance is None: + _instance = IdleConversation() + return _instance \ No newline at end of file diff --git a/src/plugins/PFC/idle_conversation_starter.py b/src/plugins/PFC/PFC_idle/idle_conversation_starter.py similarity index 90% rename from src/plugins/PFC/idle_conversation_starter.py rename to src/plugins/PFC/PFC_idle/idle_conversation_starter.py index b161edd0..1ace9dbd 100644 --- a/src/plugins/PFC/idle_conversation_starter.py +++ b/src/plugins/PFC/PFC_idle/idle_conversation_starter.py @@ -1,22 +1,33 @@ -from typing import TYPE_CHECKING, Optional -import asyncio import time +import asyncio import random -from src.common.logger import get_module_logger -from ..models.utils_model import LLMRequest +import traceback +from typing import TYPE_CHECKING, Optional +from datetime import datetime + +from src.common.logger_manager import get_logger +from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config -from .chat_observer import ChatObserver -from .message_sender import DirectMessageSender -from ..chat.chat_stream import ChatStream -from maim_message import UserInfo +from src.plugins.chat.chat_stream import chat_manager, ChatStream from src.individuality.individuality import Individuality from src.plugins.utils.chat_message_builder import build_readable_messages +from maim_message import UserInfo +from ..chat_observer import ChatObserver +from ..message_sender import DirectMessageSender + +# 导入富文本回溯,用于更好的错误展示 +from rich.traceback import install + +# 使用TYPE_CHECKING避免循环导入 if TYPE_CHECKING: - from .conversation import Conversation + from ..conversation import Conversation + from ..pfc_manager import PFCManager -logger = get_module_logger("pfc_idle") +install(extra_lines=3) +# 获取当前模块的日志记录器 +logger = get_logger("idle_conversation_starter") class IdleConversationStarter: """长时间无对话主动发起对话的组件 @@ -125,6 +136,7 @@ class IdleConversationStarter: except Exception as e: logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}") + logger.error(traceback.format_exc()) async def _check_idle_loop(self) -> None: """检查空闲状态的循环 @@ -167,6 +179,7 @@ class IdleConversationStarter: logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消") except Exception as e: logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}") + logger.error(traceback.format_exc()) # 尝试重新启动检测循环 if self._running: logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测") @@ -175,7 +188,7 @@ class IdleConversationStarter: async def _initiate_conversation(self) -> None: """生成并发送主动对话内容 - 获取聊天历史记录,使用LLM生成合适的开场白,然后发送消息。 + 获取聊天历史记录,使用LLM生成合适的开场白(大概),然后发送消息。 """ try: # 获取聊天历史记录,用于生成更合适的开场白 @@ -212,6 +225,7 @@ class IdleConversationStarter: return except Exception as llm_err: logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}") + logger.error(traceback.format_exc()) return # 清理结果 @@ -225,9 +239,10 @@ class IdleConversationStarter: # 统一错误处理,从这里开始所有操作都在同一个try-except块中 logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送") - from .pfc_manager import PFCManager - - # 获取当前实例 + # 在函数内部导入PFCManager,避免循环导入 + from ..pfc_manager import PFCManager + + # 获取当前实例 - 注意这是同步方法,不需要await pfc_manager = PFCManager.get_instance() # 结束当前对话实例(如果存在) @@ -239,6 +254,7 @@ class IdleConversationStarter: await pfc_manager.remove_conversation(self.stream_id) except Exception as e: logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例") + logger.warning(traceback.format_exc()) # 创建新的对话实例 logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息") @@ -247,6 +263,7 @@ class IdleConversationStarter: new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name) except Exception as e: logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}") + logger.error(traceback.format_exc()) return # 确保新对话实例已初始化完成 @@ -269,14 +286,17 @@ class IdleConversationStarter: new_conversation.chat_observer.trigger_update() except Exception as e: logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}") + logger.warning(traceback.format_exc()) - logger.success(f"[私聊][{self.private_name}]成功主动发起对话: {content}") + logger.info(f"[私聊][{self.private_name}]成功主动发起对话: {content}") except Exception as e: logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}") + logger.error(traceback.format_exc()) except Exception as e: # 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃 logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}") + logger.error(traceback.format_exc()) async def _get_chat_stream(self, conversation: Optional["Conversation"] = None) -> Optional[ChatStream]: """获取可用的聊天流 @@ -313,8 +333,6 @@ class IdleConversationStarter: return conversation.chat_stream # 2. 尝试从聊天管理器获取 - from src.plugins.chat.chat_stream import chat_manager - try: logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流") chat_stream = chat_manager.get_stream(self.stream_id) @@ -322,6 +340,7 @@ class IdleConversationStarter: return chat_stream except Exception as e: logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}") + logger.warning(traceback.format_exc()) # 3. 创建新的聊天流 try: @@ -332,4 +351,5 @@ class IdleConversationStarter: return ChatStream(self.stream_id, "qq", user_info) except Exception as e: logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}") + logger.error(traceback.format_exc()) return None diff --git a/src/plugins/PFC/actions.py b/src/plugins/PFC/actions.py index ea72df82..7b15cdfb 100644 --- a/src/plugins/PFC/actions.py +++ b/src/plugins/PFC/actions.py @@ -98,7 +98,7 @@ async def handle_action( current_action_record = { "action": action, "plan_reason": reason, # 记录规划时的原因 - "status": "start", # 初始状态为“开始” + "status": "start", # 初始状态为"开始" "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间 "final_reason": None, # 最终结果的原因,将在 finally 中设置 } @@ -361,9 +361,10 @@ async def handle_action( observation_info.chat_history_str = "[构建聊天记录出错]" # --- 新增结束 --- - # 更新空闲对话启动器的时间 - if conversation_instance.idle_conversation_starter: - await conversation_instance.idle_conversation_starter.update_last_message_time(send_end_time) + # 更新 idle_conversation_starter 的最后消息时间 + # (避免在发送消息后很快触发主动聊天) + if conversation_instance.idle_chat: + await conversation_instance.idle_chat.update_last_message_time(send_end_time) # 清理已处理的未读消息 (只清理在发送这条回复之前的、来自他人的消息) current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", []) @@ -505,9 +506,10 @@ async def handle_action( action_successful = True # 标记成功 # final_status 和 final_reason 会在 finally 中设置 logger.info(f"[私聊][{conversation_instance.private_name}] 成功发送告别语,即将停止对话实例。") - # 更新空闲计时器 - if conversation_instance.idle_conversation_starter: - await conversation_instance.idle_conversation_starter.update_last_message_time(send_end_time) + # 更新 idle_conversation_starter 的最后消息时间 + # (避免在发送消息后很快触发主动聊天) + if conversation_instance.idle_chat: + await conversation_instance.idle_chat.update_last_message_time(send_end_time) # 清理发送前的消息 (虽然通常是最后一条,但保持逻辑一致) current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", []) message_ids_to_clear: Set[str] = set() diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index c8fd280f..4fede3d2 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -114,6 +114,19 @@ class ChatObserver: logger.debug( f"[私聊][{self.private_name}] 消息已添加到 ChatObserver 缓存,当前缓存大小: {len(self.message_cache)}" ) + + # 检查是否用户发送的消息(而非机器人自己) + try: + from .PFC_idle.idle_chat import IdleChat + + # 获取消息的发送者 + user_info = message.get("user_info", {}) + if user_info and str(user_info.get("user_id")) != str(global_config.BOT_QQ): + # 用户发送了消息,通知IdleChat + asyncio.create_task(IdleChat.register_user_response(self.private_name)) + logger.debug(f"[私聊][{self.private_name}] 检测到用户消息,已通知IdleChat更新用户响应状态") + except Exception as e_idle: + logger.warning(f"[私聊][{self.private_name}] 通知IdleChat用户响应状态失败: {e_idle}") else: logger.warning(f"[私聊][{self.private_name}] 尝试向 message_cache 添加非字典类型消息: {type(message)}") diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 4a3c9e02..d74f5096 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -24,7 +24,7 @@ from .action_planner import ActionPlanner from .observation_info import ObservationInfo from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator -from .idle_conversation_starter import IdleConversationStarter +from .PFC_idle.idle_chat import IdleChat from .pfc_KnowledgeFetcher import KnowledgeFetcher from .waiter import Waiter from .reply_checker import ReplyChecker @@ -77,7 +77,7 @@ class Conversation: self.knowledge_fetcher: Optional[KnowledgeFetcher] = None self.waiter: Optional[Waiter] = None self.direct_sender: Optional[DirectMessageSender] = None - self.idle_conversation_starter: Optional[IdleConversationStarter] = None + self.idle_chat: Optional[IdleChat] = None self.chat_observer: Optional[ChatObserver] = None self.observation_info: Optional[ObservationInfo] = None self.conversation_info: Optional[ConversationInfo] = None @@ -142,8 +142,10 @@ class Conversation: logger.warning(f"[私聊][{self.private_name}] 跳过最终关系评估,实例未完全初始化或缺少组件。") # 停止其他组件 - if self.idle_conversation_starter: - self.idle_conversation_starter.stop() + if self.idle_chat: + # 减少活跃实例计数,而不是停止IdleChat + await self.idle_chat.decrement_active_instances() + logger.info(f"[私聊][{self.private_name}] 已减少IdleChat活跃实例计数") if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer() if self.mood_mng and hasattr(self.mood_mng, "stop_mood_update") and self.mood_mng._running: # type: ignore diff --git a/src/plugins/PFC/conversation_initializer.py b/src/plugins/PFC/conversation_initializer.py index 472c1291..ec7e47a5 100644 --- a/src/plugins/PFC/conversation_initializer.py +++ b/src/plugins/PFC/conversation_initializer.py @@ -17,7 +17,7 @@ from .action_planner import ActionPlanner from .observation_info import ObservationInfo from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator -from .idle_conversation_starter import IdleConversationStarter +from .PFC_idle.idle_chat import IdleChat from .pfc_KnowledgeFetcher import KnowledgeFetcher # 修正大小写 from .waiter import Waiter from .pfc_utils import get_person_id @@ -93,18 +93,18 @@ async def load_initial_history(conversation_instance: "Conversation"): read_mark=0.0, # read_mark 可能需要根据实际情况调整 ) - # 更新 ChatObserver 和 IdleStarter 的时间戳 + # 更新 ChatObserver 和 IdleChat 的时间戳 if conversation_instance.chat_observer: # 更新观察者的最后消息时间,避免重复处理这些初始消息 conversation_instance.chat_observer.last_message_time = ( conversation_instance.observation_info.last_message_time ) if ( - conversation_instance.idle_conversation_starter + conversation_instance.idle_chat and conversation_instance.observation_info.last_message_time ): # 更新空闲计时器的起始时间 - await conversation_instance.idle_conversation_starter.update_last_message_time( + await conversation_instance.idle_chat.update_last_message_time( conversation_instance.observation_info.last_message_time ) @@ -192,10 +192,12 @@ async def initialize_core_components(conversation_instance: "Conversation"): ) raise ValueError(f"无法获取 stream_id {conversation_instance.stream_id} 的 ChatStream") - logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 初始化 IdleConversationStarter...") - conversation_instance.idle_conversation_starter = IdleConversationStarter( + logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 初始化 IdleChat...") + conversation_instance.idle_chat = IdleChat.get_instance( conversation_instance.stream_id, conversation_instance.private_name ) + await conversation_instance.idle_chat.increment_active_instances() + logger.info(f"[私聊][{conversation_instance.private_name}] (Initializer) IdleChat实例已获取并增加活跃计数") # 2. 初始化信息存储和观察组件 logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 获取 ChatObserver 实例...") @@ -249,10 +251,10 @@ async def initialize_core_components(conversation_instance: "Conversation"): if conversation_instance.chat_observer: # 确保存在 conversation_instance.chat_observer.start() - if conversation_instance.idle_conversation_starter: - logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 启动 IdleConversationStarter...") - conversation_instance.idle_conversation_starter.start() - logger.info(f"[私聊][{conversation_instance.private_name}] (Initializer) 空闲对话检测器已启动") + if conversation_instance.idle_chat: + logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 启动 IdleChat...") + # 不需要再次启动,只需确保已初始化 + logger.info(f"[私聊][{conversation_instance.private_name}] (Initializer) IdleChat实例已初始化") if ( conversation_instance.mood_mng diff --git a/src/plugins/PFC/conversation_loop.py b/src/plugins/PFC/conversation_loop.py index 80508cf0..c4575735 100644 --- a/src/plugins/PFC/conversation_loop.py +++ b/src/plugins/PFC/conversation_loop.py @@ -70,11 +70,13 @@ async def run_conversation_loop(conversation_instance: "Conversation"): and loop_iter_start_time < conversation_instance.ignore_until_timestamp ): if ( - conversation_instance.idle_conversation_starter - and conversation_instance.idle_conversation_starter._running + conversation_instance.idle_chat + and conversation_instance.idle_chat._running ): - conversation_instance.idle_conversation_starter.stop() - logger.debug(f"[私聊][{conversation_instance.private_name}] 对话被暂时忽略,暂停空闲对话检测") + # 不直接停止服务,改为暂时忽略此用户 + # 虽然我们仍然可以通过active_instances_count来决定是否触发主动聊天 + # 但为了安全起见,我们只记录一个日志 + logger.debug(f"[私聊][{conversation_instance.private_name}] 对话被暂时忽略,暂停对该用户的主动聊天") sleep_duration = min(30, conversation_instance.ignore_until_timestamp - loop_iter_start_time) await asyncio.sleep(sleep_duration) continue @@ -89,12 +91,9 @@ async def run_conversation_loop(conversation_instance: "Conversation"): await conversation_instance.stop() # 调用 Conversation 实例的 stop 方法 continue else: - if ( - conversation_instance.idle_conversation_starter - and not conversation_instance.idle_conversation_starter._running - ): - conversation_instance.idle_conversation_starter.start() - logger.debug(f"[私聊][{conversation_instance.private_name}] 恢复空闲对话检测") + # 忽略状态结束,这里不需要任何特殊处理 + # IdleChat会通过active_instances_count自动决定是否触发 + pass # 核心规划与行动逻辑 try: