From a6b7ba2d7642e85d8b0e238934323cc230a93730 Mon Sep 17 00:00:00 2001 From: Bakadax Date: Tue, 6 May 2025 11:03:14 +0800 Subject: [PATCH] =?UTF-8?q?ruff=20=E9=81=BF=E5=85=8Didle=E5=BE=AA=E7=8E=AF?= =?UTF-8?q?=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/action_planner.py | 34 +- src/plugins/PFC/conversation.py | 1023 ++++++++++++++---- src/plugins/PFC/idle_conversation_starter.py | 446 +++----- src/plugins/PFC/pfc_manager.py | 60 +- 4 files changed, 1001 insertions(+), 562 deletions(-) diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index 854d734c..554b0112 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -276,11 +276,13 @@ class ActionPlanner: time_info = "" try: - if not observation_info or not observation_info.bot_id: return "" + if not observation_info or not observation_info.bot_id: + return "" bot_id_str = str(observation_info.bot_id) if hasattr(observation_info, "chat_history") and observation_info.chat_history: for msg in reversed(observation_info.chat_history): - if not isinstance(msg, dict): continue + if not isinstance(msg, dict): + continue sender_info = msg.get("user_info", {}) sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None msg_time = msg.get("time") @@ -289,8 +291,10 @@ class ActionPlanner: if time_diff < 60.0: time_info = f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n" break - except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}") - except Exception as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}") + except AttributeError as e: + logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}") + except Exception as e: + logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}") return time_info def _get_timeout_context(self, conversation_info: ConversationInfo) -> str: @@ -301,14 +305,18 @@ class ActionPlanner: if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: last_goal_item = conversation_info.goal_list[-1] last_goal_text = "" - if isinstance(last_goal_item, dict): last_goal_text = last_goal_item.get("goal", "") - elif isinstance(last_goal_item, str): last_goal_text = last_goal_item + if isinstance(last_goal_item, dict): + last_goal_text = last_goal_item.get("goal", "") + elif isinstance(last_goal_item, str): + last_goal_text = last_goal_item if isinstance(last_goal_text, str) and "分钟," in last_goal_text and "思考接下来要做什么" in last_goal_text: wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了","").strip() timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n" logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}") - except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}") - except Exception as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}") + except AttributeError as e: + logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}") + except Exception as e: + logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}") return timeout_context def _build_goals_string(self, conversation_info: ConversationInfo) -> str: @@ -319,9 +327,11 @@ class ActionPlanner: if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: recent_goals = conversation_info.goal_list[-3:] for goal_item in recent_goals: - goal = "目标内容缺失"; reasoning = "没有明确原因" + goal = "目标内容缺失" + reasoning = "没有明确原因" if isinstance(goal_item, dict): - goal = goal_item.get("goal", goal); reasoning = goal_item.get("reasoning", reasoning) + goal = goal_item.get("goal", goal) + reasoning = goal_item.get("reasoning", reasoning) elif isinstance(goal_item, str): goal = goal_item goal = str(goal) if goal is not None else "目标内容缺失" reasoning = str(reasoning) if reasoning is not None else "没有明确原因" @@ -339,8 +349,8 @@ class ActionPlanner: try: if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: chat_history_text = observation_info.chat_history_str elif hasattr(observation_info, "chat_history") and observation_info.chat_history: - history_slice = observation_info.chat_history[-20:] - chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) + history_slice = observation_info.chat_history[-20:] + chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) else: chat_history_text = "还没有聊天记录。\n" unread_count = getattr(observation_info, 'new_messages_count', 0) unread_messages = getattr(observation_info, 'unprocessed_messages', []) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 7b841f6f..fc8e87d1 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,178 +1,529 @@ -# -*- coding: utf-8 -*- -# File: conversation.py import time import asyncio import datetime import traceback from typing import Dict, Any, Optional, Set, List - -# ... (其他 imports 保持不变) ... from src.common.logger_manager import get_logger from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat from maim_message import UserInfo from src.plugins.chat.chat_stream import chat_manager, ChatStream -from ..chat.message import Message # 假设 Message 类在这里 -from ...config.config import global_config # 导入全局配置 +from ..chat.message import Message +from ...config.config import global_config -from .pfc_types import ConversationState # 导入更新后的 pfc_types -from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py +# 导入 PFC 内部组件和类型 +from .pfc_types import ConversationState +from .pfc import GoalAnalyzer from .chat_observer import ChatObserver from .message_sender import DirectMessageSender from .action_planner import ActionPlanner from .observation_info import ObservationInfo -from .conversation_info import ConversationInfo # 导入修改后的 ConversationInfo +from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator from .idle_conversation_starter import IdleConversationStarter -from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 +from .pfc_KnowledgeFetcher import KnowledgeFetcher from .waiter import Waiter -from .reply_checker import ReplyChecker # <--- 确保 ReplyChecker 被导入 +from .reply_checker import ReplyChecker +# 导入富文本回溯,用于更好的错误展示 from rich.traceback import install install(extra_lines=3) +# 获取当前模块的日志记录器 logger = get_logger("pfc_conversation") class Conversation: - """对话类,负责管理单个对话的状态和行为""" + """ + 对话类,负责管理单个私聊对话的状态和核心逻辑流程。 + 包含对话的初始化、启动、停止、规划循环以及动作处理。 + """ - # --- __init__, _initialize, _load_initial_history, start, stop, _plan_and_action_loop, _convert_to_message (保持不变) --- def __init__(self, stream_id: str, private_name: str): - """初始化对话实例""" - self.stream_id = stream_id; self.private_name = private_name; self.state = ConversationState.INIT; self.should_continue = False; self.ignore_until_timestamp: Optional[float] = None; self.generated_reply = ""; self.chat_stream: Optional[ChatStream] = None - self.action_planner: Optional[ActionPlanner] = None; self.goal_analyzer: Optional[GoalAnalyzer] = None; self.reply_generator: Optional[ReplyGenerator] = None; self.knowledge_fetcher: Optional[KnowledgeFetcher] = None; self.waiter: Optional[Waiter] = None; self.direct_sender: Optional[DirectMessageSender] = None; self.idle_conversation_starter: Optional[IdleConversationStarter] = None; self.chat_observer: Optional[ChatObserver] = None; self.observation_info: Optional[ObservationInfo] = None; self.conversation_info: Optional[ConversationInfo] = None; self.reply_checker: Optional[ReplyChecker] = None - self._initializing = False; self._initialized = False - self.bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None - if not self.bot_qq_str: logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ ID!PFC 可能无法正常工作。") + """ + 初始化对话实例。 + + Args: + stream_id (str): 唯一的聊天流 ID。 + private_name (str): 私聊对象的名称,用于日志和区分。 + """ + self.stream_id: str = stream_id + self.private_name: str = private_name + self.state: ConversationState = ConversationState.INIT # 对话的初始状态 + self.should_continue: bool = False # 标记对话循环是否应该继续运行 + self.ignore_until_timestamp: Optional[float] = None # 如果设置了,忽略此时间戳之前的活动 + self.generated_reply: str = "" # 存储最近生成的回复内容 + self.chat_stream: Optional[ChatStream] = None # 关联的聊天流对象 + + # 初始化所有核心组件为 None,将在 _initialize 中创建 + self.action_planner: Optional[ActionPlanner] = None + self.goal_analyzer: Optional[GoalAnalyzer] = None + self.reply_generator: Optional[ReplyGenerator] = None + self.knowledge_fetcher: Optional[KnowledgeFetcher] = None + self.waiter: Optional[Waiter] = None + self.direct_sender: Optional[DirectMessageSender] = None + self.idle_conversation_starter: Optional[IdleConversationStarter] = None + self.chat_observer: Optional[ChatObserver] = None + self.observation_info: Optional[ObservationInfo] = None + self.conversation_info: Optional[ConversationInfo] = None + self.reply_checker: Optional[ReplyChecker] = None # 回复检查器 + + # 内部状态标志 + self._initializing: bool = False # 标记是否正在初始化,防止并发问题 + self._initialized: bool = False # 标记是否已成功初始化 + + # 缓存机器人自己的 QQ 号字符串,避免重复转换 + self.bot_qq_str: Optional[str] = str(global_config.BOT_QQ) if global_config.BOT_QQ else None + if not self.bot_qq_str: + # 这是一个严重问题,记录错误 + logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ ID!PFC 可能无法正常工作。") + # 可以在这里抛出异常或采取其他错误处理措施 async def _initialize(self): - """异步初始化对话实例及其所有组件""" - if self._initialized or self._initializing: logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。"); return - self._initializing = True; logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}") + """ + 异步初始化对话实例及其所有依赖的核心组件。 + 这是一个关键步骤,确保所有部分都准备就绪才能开始对话循环。 + """ + # 防止重复初始化 + if self._initialized or self._initializing: + logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。") + return + + self._initializing = True # 标记开始初始化 + logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}") + try: - self.action_planner = ActionPlanner(self.stream_id, self.private_name); self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name); self.reply_generator = ReplyGenerator(self.stream_id, self.private_name); self.knowledge_fetcher = KnowledgeFetcher(self.private_name); self.waiter = Waiter(self.stream_id, self.private_name); self.direct_sender = DirectMessageSender(self.private_name); self.reply_checker = ReplyChecker(self.stream_id, self.private_name) - self.chat_stream = chat_manager.get_stream(self.stream_id); - if not self.chat_stream: raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream") + # 1. 初始化核心功能组件 + logger.debug(f"[私聊][{self.private_name}] 初始化 ActionPlanner...") + self.action_planner = ActionPlanner(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 GoalAnalyzer...") + self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ReplyGenerator...") + self.reply_generator = ReplyGenerator(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 KnowledgeFetcher...") + self.knowledge_fetcher = KnowledgeFetcher(self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 Waiter...") + self.waiter = Waiter(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 DirectMessageSender...") + self.direct_sender = DirectMessageSender(self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ReplyChecker...") + self.reply_checker = ReplyChecker(self.stream_id, self.private_name) + + # 获取关联的 ChatStream + logger.debug(f"[私聊][{self.private_name}] 获取 ChatStream...") + self.chat_stream = chat_manager.get_stream(self.stream_id) + if not self.chat_stream: + # 获取不到 ChatStream 是一个严重问题,因为无法发送消息 + logger.error(f"[私聊][{self.private_name}] 初始化错误:无法从 chat_manager 获取 stream_id {self.stream_id} 的 ChatStream。") + raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream") + + # 初始化空闲对话启动器 + logger.debug(f"[私聊][{self.private_name}] 初始化 IdleConversationStarter...") self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) - self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name); self.observation_info = ObservationInfo(self.private_name) - if not self.observation_info.bot_id: logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id,尝试手动设置。"); self.observation_info.bot_id = self.bot_qq_str + + # 2. 初始化信息存储和观察组件 + logger.debug(f"[私聊][{self.private_name}] 获取 ChatObserver 实例...") + self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) + logger.debug(f"[私聊][{self.private_name}] 初始化 ObservationInfo...") + self.observation_info = ObservationInfo(self.private_name) + # 确保 ObservationInfo 知道机器人的 ID + if not self.observation_info.bot_id: + logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id,尝试手动设置。") + self.observation_info.bot_id = self.bot_qq_str + logger.debug(f"[私聊][{self.private_name}] 初始化 ConversationInfo...") self.conversation_info = ConversationInfo() - self.observation_info.bind_to_chat_observer(self.chat_observer); await self._load_initial_history(); self.chat_observer.start() - if self.idle_conversation_starter: self.idle_conversation_starter.start(); logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动") - self._initialized = True; self.should_continue = True; self.state = ConversationState.ANALYZING; logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。") - except Exception as e: logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}\n{traceback.format_exc()}"); self.should_continue = False; self._initialized = False; await self.stop(); raise - finally: self._initializing = False + + # 3. 绑定观察者和信息处理器 + logger.debug(f"[私聊][{self.private_name}] 绑定 ObservationInfo 到 ChatObserver...") + self.observation_info.bind_to_chat_observer(self.chat_observer) + + # 4. 加载初始聊天记录 + await self._load_initial_history() + + # 5. 启动需要后台运行的组件 + logger.debug(f"[私聊][{self.private_name}] 启动 ChatObserver...") + self.chat_observer.start() + if self.idle_conversation_starter: + logger.debug(f"[私聊][{self.private_name}] 启动 IdleConversationStarter...") + self.idle_conversation_starter.start() + logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动") + + # 6. 标记初始化成功并设置运行状态 + self._initialized = True + self.should_continue = True # 初始化成功,标记可以继续运行循环 + self.state = ConversationState.ANALYZING # 设置初始状态为分析 + + logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。") + + except Exception as e: + # 捕获初始化过程中的任何异常 + logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + self.should_continue = False # 初始化失败,标记不能继续 + self._initialized = False # 确保标记为未初始化 + # 尝试停止可能部分启动的组件 + await self.stop() + raise # 将异常重新抛出,通知调用者初始化失败 + finally: + # 无论成功与否,都要清除正在初始化的标记 + self._initializing = False async def _load_initial_history(self): - """加载初始聊天记录""" - if not self.observation_info: return + """加载并处理初始的聊天记录""" + if not self.observation_info: + logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未初始化,无法加载历史记录。") + return + try: logger.info(f"[私聊][{self.private_name}] 为 {self.stream_id} 加载初始聊天记录...") - initial_messages = get_raw_msg_before_timestamp_with_chat(chat_id=self.stream_id, timestamp=time.time(), limit=30,) + # 从聊天核心获取原始消息列表 + initial_messages = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=30, # limit 可以根据需要调整或配置 + ) + if initial_messages: - self.observation_info.chat_history = initial_messages; self.observation_info.chat_history_count = len(initial_messages) - last_msg = initial_messages[-1]; self.observation_info.last_message_time = last_msg.get("time"); self.observation_info.last_message_id = last_msg.get("message_id") + # 更新 ObservationInfo 中的历史记录列表和计数 + self.observation_info.chat_history = initial_messages + self.observation_info.chat_history_count = len(initial_messages) + + # 获取最后一条消息的信息 + last_msg = initial_messages[-1] + self.observation_info.last_message_time = last_msg.get("time") + self.observation_info.last_message_id = last_msg.get("message_id") + + # 安全地解析最后一条消息的发送者信息 last_user_info_dict = last_msg.get("user_info", {}) if isinstance(last_user_info_dict, dict): - try: last_user_info = UserInfo.from_dict(last_user_info_dict); self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None - except Exception as e: logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}"); self.observation_info.last_message_sender = None - else: self.observation_info.last_message_sender = None + try: + last_user_info = UserInfo.from_dict(last_user_info_dict) + # 存储发送者的 user_id 字符串 + self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None + except Exception as e: + logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}") + self.observation_info.last_message_sender = None + else: + # 如果 user_info 不是字典,也标记为未知 + self.observation_info.last_message_sender = None + + # 存储最后一条消息的文本内容 self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") - history_slice_for_str = initial_messages[-20:] - self.observation_info.chat_history_str = await build_readable_messages(history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) - if self.chat_observer: self.chat_observer.last_message_time = self.observation_info.last_message_time - if self.idle_conversation_starter and self.observation_info.last_message_time: await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) + + # 构建用于 Prompt 的历史记录字符串 (只使用最近的一部分) + history_slice_for_str = initial_messages[-20:] # 可配置 + self.observation_info.chat_history_str = await build_readable_messages( + history_slice_for_str, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0 # read_mark 可能需要根据实际情况调整 + ) + + # 更新 ChatObserver 和 IdleStarter 的时间戳 + if self.chat_observer: + # 更新观察者的最后消息时间,避免重复处理这些初始消息 + self.chat_observer.last_message_time = self.observation_info.last_message_time + if self.idle_conversation_starter and self.observation_info.last_message_time: + # 更新空闲计时器的起始时间 + await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) + logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}") - else: logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。"); self.observation_info.chat_history_str = "还没有聊天记录。" + else: + # 如果没有历史记录 + logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。") + self.observation_info.chat_history_str = "还没有聊天记录。" # 设置默认提示 + except Exception as load_err: - logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}"); - if self.observation_info: self.observation_info.chat_history_str = "[加载聊天记录出错]" + # 捕获加载过程中的异常 + logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}") + # 即使出错,也设置一个提示,避免后续使用 None 值 + if self.observation_info: + self.observation_info.chat_history_str = "[加载聊天记录出错]" async def start(self): - """开始对话流程 (增强检查)""" + """ + 启动对话流程。 + 会检查实例是否已初始化,如果未初始化会尝试初始化。 + 成功后,创建并启动核心的规划与行动循环 (`_plan_and_action_loop`)。 + """ + # 检查是否已初始化,如果未初始化则尝试进行初始化 if not self._initialized: - logger.warning(f"[私聊][{self.private_name}] 对话实例未初始化,尝试初始化...") - try: - await self._initialize(); - if not self._initialized: logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。"); return - except Exception as init_err: logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。"); return - if not self.should_continue: logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。"); return + logger.warning(f"[私聊][{self.private_name}] 对话实例未初始化,尝试初始化...") + try: + await self._initialize() + # 在尝试初始化后,再次检查状态 + if not self._initialized: + logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。") + return # 初始化失败,明确停止 + except Exception as init_err: + logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。") + return # 初始化异常,明确停止 + + # 再次检查 should_continue 标志,确保初始化成功且未被外部停止 + if not self.should_continue: + logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。") + return + logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...") - try: logger.debug(f"[私聊][{self.private_name}] 正在创建 _plan_and_action_loop 任务..."); loop_task = asyncio.create_task(self._plan_and_action_loop()); logger.info(f"[私聊][{self.private_name}] 规划循环任务已创建。") - except Exception as task_err: logger.error(f"[私聊][{self.private_name}] 创建规划循环任务时出错: {task_err}"); await self.stop() + # 使用 asyncio.create_task 在后台启动主循环 + try: + logger.debug(f"[私聊][{self.private_name}] 正在创建 _plan_and_action_loop 任务...") + # 创建任务,但不等待其完成,让它在后台运行 + loop_task = asyncio.create_task(self._plan_and_action_loop()) + # 可以选择性地添加完成回调来处理任务结束或异常 + # loop_task.add_done_callback(self._handle_loop_completion) + logger.info(f"[私聊][{self.private_name}] 规划循环任务已创建。") + except Exception as task_err: + logger.error(f"[私聊][{self.private_name}] 创建规划循环任务时出错: {task_err}") + # 如果创建任务失败,可能需要停止实例 + await self.stop() async def stop(self): - """停止对话实例并清理资源""" - logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}"); self.should_continue = False - if self.idle_conversation_starter: self.idle_conversation_starter.stop() - if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer() - self._initialized = False; logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。") + """ + 停止对话实例并清理相关资源。 + 会停止后台任务、解绑观察者等。 + """ + logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}") + self.should_continue = False # 设置标志,让主循环退出 + + # 停止空闲对话检测器 + if self.idle_conversation_starter: + self.idle_conversation_starter.stop() + + # 解绑 ObservationInfo 与 ChatObserver + if self.observation_info and self.chat_observer: + self.observation_info.unbind_from_chat_observer() + + # ChatObserver 是单例,通常不由单个 Conversation 停止 + # 如果需要,可以在管理器层面处理 ChatObserver 的生命周期 + + # 标记为未初始化 + self._initialized = False + logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。") async def _plan_and_action_loop(self): - """思考步,PFC核心循环模块 - 实现精细化中断逻辑""" + """ + 核心的规划与行动循环 (PFC Loop)。 + 持续运行,根据当前状态规划下一步行动,处理新消息中断,执行动作,直到被停止。 + """ logger.info(f"[私聊][{self.private_name}] 进入 _plan_and_action_loop 循环。") - if not self._initialized: logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环。"); return + + # 循环前再次确认初始化状态 + if not self._initialized: + logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环,退出。") + return # 明确退出 + + # 主循环,只要 should_continue 为 True 就一直运行 while self.should_continue: - loop_iter_start_time = time.time(); logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})") + loop_iter_start_time = time.time() # 记录本次循环开始时间 + logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})") + + # --- 处理忽略状态 --- if self.ignore_until_timestamp and loop_iter_start_time < self.ignore_until_timestamp: - if self.idle_conversation_starter and self.idle_conversation_starter._running: self.idle_conversation_starter.stop(); logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测") - sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time); await asyncio.sleep(sleep_duration); continue - elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp: logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。"); self.ignore_until_timestamp = None; await self.stop(); continue + # 如果当前处于忽略状态 + if self.idle_conversation_starter and self.idle_conversation_starter._running: + # 暂停空闲检测器 + self.idle_conversation_starter.stop() + logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测") + # 计算需要睡眠的时间,最多30秒或直到忽略结束 + sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time) + await asyncio.sleep(sleep_duration) + continue # 跳过本次循环的后续步骤,直接进入下一次迭代检查 + elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp: + # 如果忽略时间已到 + logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。") + self.ignore_until_timestamp = None # 清除忽略时间戳 + await self.stop() # 调用 stop 方法来结束整个对话实例 + continue # 跳过本次循环的后续步骤 else: - if self.idle_conversation_starter and not self.idle_conversation_starter._running: self.idle_conversation_starter.start(); logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测") + # 如果不在忽略状态,确保空闲检测器在运行 + if self.idle_conversation_starter and not self.idle_conversation_starter._running: + self.idle_conversation_starter.start() + logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测") + + # --- 核心规划与行动逻辑 --- try: - if not all([self.action_planner, self.observation_info, self.conversation_info]): logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。将等待5秒后重试..."); await asyncio.sleep(5); continue - planning_start_time = time.time(); logger.debug(f"[私聊][{self.private_name}] --- 开始规划 ({planning_start_time:.2f}) ---"); self.conversation_info.other_new_messages_during_planning_count = 0 - logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan..."); action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action) - planning_duration = time.time() - planning_start_time; logger.debug(f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}") - current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []); new_messages_during_planning: List[Dict[str, Any]] = []; other_new_messages_during_planning: List[Dict[str, Any]] = [] + # 1. 检查核心组件是否都已初始化 + if not all([self.action_planner, self.observation_info, self.conversation_info]): + logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。将等待5秒后重试...") + await asyncio.sleep(5) + continue # 跳过本次迭代 + + # 2. 记录规划开始时间并重置临时状态 + planning_start_time = time.time() + logger.debug(f"[私聊][{self.private_name}] --- 开始规划 ({planning_start_time:.2f}) ---") + # 重置上一轮存储的“规划期间他人新消息数” + self.conversation_info.other_new_messages_during_planning_count = 0 + + # 3. 调用 ActionPlanner 进行规划 + logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan...") + # 传入当前观察信息、对话信息和上次成功回复的动作类型 + action, reason = await self.action_planner.plan( + self.observation_info, + self.conversation_info, + self.conversation_info.last_successful_reply_action + ) + planning_duration = time.time() - planning_start_time + logger.debug(f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}") + + # 4. 检查规划期间是否有新消息到达 + current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []) + new_messages_during_planning: List[Dict[str, Any]] = [] + other_new_messages_during_planning: List[Dict[str, Any]] = [] + + # 遍历当前所有未处理的消息 for msg in current_unprocessed_messages: - msg_time = msg.get('time'); sender_id = msg.get("user_info", {}).get("user_id") + msg_time = msg.get('time') + sender_id = msg.get("user_info", {}).get("user_id") + # 检查消息时间是否在本次规划开始之后 if msg_time and msg_time >= planning_start_time: - new_messages_during_planning.append(msg); - if sender_id != self.bot_qq_str: + new_messages_during_planning.append(msg) + # 同时检查是否是来自他人的消息 + if sender_id != self.bot_qq_str: other_new_messages_during_planning.append(msg) - new_msg_count = len(new_messages_during_planning); other_new_msg_count = len(other_new_messages_during_planning); logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") - should_interrupt = False; interrupt_reason = "" + + new_msg_count = len(new_messages_during_planning) # 规划期间所有新消息数 + other_new_msg_count = len(other_new_messages_during_planning) # 规划期间他人新消息数 + logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") + + # 5. 根据动作类型和新消息数量,判断是否需要中断当前规划 + should_interrupt: bool = False + interrupt_reason: str = "" + if action in ["wait", "listening"]: - if new_msg_count > 0: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") + # 规则:对于 wait/listen,任何新消息(无论来自谁)都应该中断 + if new_msg_count > 0: + should_interrupt = True + interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息" + logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") else: - interrupt_threshold = 2 - if other_new_msg_count > interrupt_threshold: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {other_new_msg_count} 条来自他人的新消息 (阈值 >{interrupt_threshold})"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") + # 规则:对于其他动作,检查来自他人的新消息是否超过阈值 2 + interrupt_threshold: int = 2 + if other_new_msg_count > interrupt_threshold: + should_interrupt = True + interrupt_reason = f"规划 {action} 期间收到 {other_new_msg_count} 条来自他人的新消息 (阈值 >{interrupt_threshold})" + logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") + + # 6. 如果需要中断,则记录取消信息,重置状态,并进入下一次循环 if should_interrupt: logger.info(f"[私聊][{self.private_name}] 执行中断,重新规划...") - cancel_record = {"action": action, "plan_reason": reason, "status": "cancelled_due_to_new_messages", "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": interrupt_reason} - if not hasattr(self.conversation_info, "done_action"): self.conversation_info.done_action = [] - self.conversation_info.done_action.append(cancel_record); self.conversation_info.last_successful_reply_action = None; self.state = ConversationState.ANALYZING; await asyncio.sleep(0.1); continue - logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'..."); self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count - await self._handle_action(action, reason, self.observation_info, self.conversation_info); logger.debug(f"[私聊][{self.private_name}] _handle_action 完成。") - goal_ended = False + # 记录被取消的动作到历史记录 + cancel_record = { + "action": action, + "plan_reason": reason, + "status": "cancelled_due_to_new_messages", # 标记取消原因 + "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "final_reason": interrupt_reason + } + # 安全地添加到 done_action 列表 + if not hasattr(self.conversation_info, "done_action"): + self.conversation_info.done_action = [] + self.conversation_info.done_action.append(cancel_record) + + # 重置追问状态,因为当前动作被中断了 + self.conversation_info.last_successful_reply_action = None + # 将状态设置回分析,准备处理新消息并重新规划 + self.state = ConversationState.ANALYZING + await asyncio.sleep(0.1) # 短暂等待,避免CPU空转 + continue # 直接进入下一次循环迭代 + + # 7. 如果未中断,存储规划期间的他人新消息数,并执行动作 + logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'...") + # 将计算出的“规划期间他人新消息数”存入 conversation_info,供 _handle_action 使用 + self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count + # 调用动作处理函数 + await self._handle_action(action, reason, self.observation_info, self.conversation_info) + logger.debug(f"[私聊][{self.private_name}] _handle_action 完成。") + + # 8. 检查是否需要结束整个对话(例如目标达成或执行了结束动作) + goal_ended: bool = False + # 检查最新的目标是否是“结束对话” if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: - last_goal_item = self.conversation_info.goal_list[-1]; current_goal = None - if isinstance(last_goal_item, dict): current_goal = last_goal_item.get("goal") - elif isinstance(last_goal_item, str): current_goal = last_goal_item - if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True + last_goal_item = self.conversation_info.goal_list[-1] + current_goal: Optional[str] = None + if isinstance(last_goal_item, dict): + current_goal = last_goal_item.get("goal") + elif isinstance(last_goal_item, str): + current_goal = last_goal_item + if isinstance(current_goal, str) and current_goal == "结束对话": + goal_ended = True + + # 检查最后执行的动作是否是结束类型且成功完成 last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} - action_ended = last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("status") == "done" - if goal_ended or action_ended: logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"); await self.stop(); continue - except asyncio.CancelledError: logger.info(f"[私聊][{self.private_name}] PFC 主循环被取消。"); await self.stop(); break - except Exception as loop_err: logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; await asyncio.sleep(5) # 使用修正后的 ERROR 状态 - loop_duration = time.time() - loop_iter_start_time; min_loop_interval = 0.1; logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。") - if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration) + action_ended: bool = ( + last_action_record.get("action") in ["end_conversation", "say_goodbye"] and + last_action_record.get("status") == "done" + ) + + # 如果满足任一结束条件,则停止循环 + if goal_ended or action_ended: + logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。") + await self.stop() # 调用 stop 来停止实例 + continue # 跳过后续,虽然 stop 会设置 should_continue=False + + except asyncio.CancelledError: + # 处理任务被取消的情况 + logger.info(f"[私聊][{self.private_name}] PFC 主循环任务被取消。") + await self.stop() # 确保资源被清理 + break # 明确退出循环 + except Exception as loop_err: + # 捕获循环中的其他未预期错误 + logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + self.state = ConversationState.ERROR # 设置错误状态 + # 可以在这里添加更复杂的错误恢复逻辑,或者简单等待后重试 + await asyncio.sleep(5) # 等待一段时间,避免错误状态下快速空转 + + # --- 控制循环频率 --- + loop_duration = time.time() - loop_iter_start_time # 计算本次循环耗时 + min_loop_interval = 0.1 # 设置最小循环间隔(秒),防止CPU占用过高 + logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。") + if loop_duration < min_loop_interval: + # 如果循环太快,则睡眠一段时间 + await asyncio.sleep(min_loop_interval - loop_duration) + + # 循环结束后的日志 logger.info(f"[私聊][{self.private_name}] PFC 循环已退出 for stream_id: {self.stream_id}") + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]: - """将消息字典转换为Message对象 (保持不变)""" - # (代码同 v6_debug 版本) + """将从数据库或其他来源获取的消息字典转换为内部使用的 Message 对象""" try: + # 优先使用实例自身的 chat_stream,如果不存在则尝试从管理器获取 chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id) - if not chat_stream_to_use: logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"); return None - user_info_dict = msg_dict.get("user_info", {}); user_info: Optional[UserInfo] = None + if not chat_stream_to_use: + logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。") + return None # 无法确定聊天流,返回 None + + # 解析用户信息字典 + user_info_dict = msg_dict.get("user_info", {}) + user_info: Optional[UserInfo] = None if isinstance(user_info_dict, dict): - try: user_info = UserInfo.from_dict(user_info_dict) - except Exception as e: logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") - if not user_info: logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo,无法转换。 msg_id: {msg_dict.get('message_id')}"); return None - return Message(message_id=msg_dict.get("message_id", f"gen_{time.time()}"), chat_stream=chat_stream_to_use, time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", "")) - except Exception as e: logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}\n{traceback.format_exc()}"); return None + try: + # 使用 UserInfo 类的方法从字典创建对象 + user_info = UserInfo.from_dict(user_info_dict) + except Exception as e: + # 解析失败记录警告 + logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") + if not user_info: + # 如果没有有效的 UserInfo,记录警告并返回 None + logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo,无法转换。 msg_id: {msg_dict.get('message_id')}") + return None + + # 创建并返回 Message 对象 + return Message( + message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 如果没有ID,生成一个临时的 + chat_stream=chat_stream_to_use, + time=msg_dict.get("time", time.time()), # 如果没有时间戳,使用当前时间 + user_info=user_info, # 使用解析出的 UserInfo 对象 + processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本 + detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本 + # 根据 Message 类的定义,可能还需要其他字段 + ) + except Exception as e: + # 捕获转换过程中的任何异常 + logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + return None # 转换失败返回 None async def _handle_action( @@ -182,180 +533,410 @@ class Conversation: observation_info: ObservationInfo, conversation_info: ConversationInfo ): - """处理规划的行动 - 重新加入 ReplyChecker""" + """ + 处理由 ActionPlanner 规划出的具体行动。 + 包括生成回复、调用检查器、发送消息、等待、思考目标等。 + 并根据执行结果和规则更新对话状态。 + """ + # 检查初始化状态 if not self._initialized: - logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'。") - return + logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'。") + return logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") - action_start_time = time.time() + action_start_time = time.time() # 记录动作开始时间,用于计算耗时 - # 记录action历史 + # --- 准备动作历史记录 --- current_action_record = { - "action": action, "plan_reason": reason, "status": "start", - "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": None, + "action": action, + "plan_reason": reason, # 记录规划时的原因 + "status": "start", # 初始状态为“开始” + "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间 + "final_reason": None, # 最终结果的原因,将在 finally 中设置 } - if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] + # 安全地添加到历史记录列表 + if not hasattr(conversation_info, "done_action"): + conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) + # 获取当前记录在列表中的索引,方便后续更新状态 action_index = len(conversation_info.done_action) - 1 - action_successful = False - final_status = "recall" - final_reason = "动作未成功执行" - need_replan_from_checker = False + # --- 初始化动作执行状态变量 --- + action_successful: bool = False # 标记动作是否成功执行 + final_status: str = "recall" # 动作最终状态,默认为 recall (表示未成功或需重试) + final_reason: str = "动作未成功执行" # 动作最终原因 + need_replan_from_checker: bool = False # 标记是否由 ReplyChecker 要求重新规划 try: - # --- 根据不同的 action 执行 --- - if action in ["direct_reply", "send_new_message"]: - # --- 生成回复逻辑 --- - self.state = ConversationState.GENERATING - if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化") - generated_content = await self.reply_generator.generate(observation_info, conversation_info, action_type=action) - logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") + # --- 根据不同的 action 类型执行相应的逻辑 --- + # 1. 处理需要生成并可能发送消息的动作 + if action in ["direct_reply", "send_new_message"]: + # --- a. 生成回复 --- + self.state = ConversationState.GENERATING # 更新对话状态 + if not self.reply_generator: + # 检查依赖组件是否存在 + raise RuntimeError("ReplyGenerator 未初始化") + # 调用 ReplyGenerator 生成回复内容 + generated_content = await self.reply_generator.generate( + observation_info, + conversation_info, + action_type=action + ) + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") # 日志中截断长内容 + + # 检查生成内容是否有效 if not generated_content or generated_content.startswith("抱歉"): + # 如果生成失败或返回错误提示 logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,标记失败。") - final_reason = "生成内容无效"; final_status = "recall" + final_reason = "生成内容无效" + final_status = "recall" # 标记为 recall + # 重置追问状态,因为本次回复失败 conversation_info.last_successful_reply_action = None else: - # --- [核心修复] 调用 ReplyChecker --- - self.state = ConversationState.CHECKING - if not self.reply_checker: raise RuntimeError("ReplyChecker 未初始化") + # --- b. 检查回复 (如果生成成功) --- + self.state = ConversationState.CHECKING # 更新状态为检查中 + if not self.reply_checker: + raise RuntimeError("ReplyChecker 未初始化") - current_goal_str = "" + # 准备检查所需的上下文信息 + current_goal_str: str = "" # 当前对话目标字符串 if conversation_info.goal_list: - goal_item = conversation_info.goal_list[-1] - if isinstance(goal_item, dict): current_goal_str = goal_item.get('goal', '') - elif isinstance(goal_item, str): current_goal_str = goal_item - chat_history_for_check = getattr(observation_info, 'chat_history', []) - # chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') # <--- 获取文本形式的历史记录 - # --- 修正:直接使用 observation_info 中的 chat_history_str --- - chat_history_text_for_check = getattr(observation_info, 'chat_history_str', '') - retry_count = 0 + # 通常检查最新的目标 + goal_item = conversation_info.goal_list[-1] + if isinstance(goal_item, dict): + current_goal_str = goal_item.get('goal', '') + elif isinstance(goal_item, str): + current_goal_str = goal_item + # 获取用于检查的聊天记录 (列表和字符串形式) + chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, 'chat_history', []) + chat_history_text_for_check: str = getattr(observation_info, 'chat_history_str', '') + # 当前重试次数 (如果未来加入重试逻辑,这里需要传递实际次数) + retry_count: int = 0 logger.debug(f"[私聊][{self.private_name}] 调用 ReplyChecker 检查回复...") + # 调用 ReplyChecker 的 check 方法 is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check( reply=generated_content, goal=current_goal_str, chat_history=chat_history_for_check, # 传递列表形式的历史记录 - chat_history_text=chat_history_text_for_check, # <--- 修正参数名 + chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录 retry_count=retry_count ) logger.info(f"[私聊][{self.private_name}] ReplyChecker 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}") + # --- c. 处理检查结果 --- if not is_suitable or need_replan_from_checker: + # 如果检查不通过或 Checker 要求重新规划 + # 记录拒绝原因和内容,供下次生成时参考 conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = generated_content - final_reason = f"回复检查不通过: {check_reason}"; final_status = "recall" + # 设置最终状态和原因 + final_reason = f"回复检查不通过: {check_reason}" + final_status = "recall" # 标记为 recall + # 重置追问状态 conversation_info.last_successful_reply_action = None logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因回复检查失败而被拒绝。") - else: # 检查通过 - conversation_info.last_reply_rejection_reason = None; conversation_info.last_rejected_reply_content = None - # --- 发送回复逻辑 --- - self.generated_reply = generated_content; timestamp_before_sending = time.time() - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}"); self.state = ConversationState.SENDING - send_success = await self._send_reply(); send_end_time = time.time() + # 注意:如果 need_replan_from_checker 为 True,后续逻辑会因 final_status 为 recall 而可能触发重新规划 + else: + # 如果检查通过 + # 清除上次的拒绝信息 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + # --- d. 发送回复 --- + self.generated_reply = generated_content # 存储待发送内容 + timestamp_before_sending = time.time() # 记录发送前时间戳 + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}") + self.state = ConversationState.SENDING # 更新状态为发送中 + # 调用内部发送方法 + send_success = await self._send_reply() + send_end_time = time.time() # 记录发送结束时间 + if send_success: - action_successful = True; logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复。") - if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time) - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []); message_ids_to_clear: Set[str] = set() + # 如果发送成功 + action_successful = True # 标记动作成功 + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") + # 更新空闲计时器 + if self.idle_conversation_starter: + await self.idle_conversation_starter.update_last_message_time(send_end_time) + + # --- e. 清理已处理消息 --- + current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear: Set[str] = set() + # 遍历所有未处理消息 for msg in current_unprocessed_messages: - msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id") - if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id) - if message_ids_to_clear: logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"); await observation_info.clear_processed_messages(message_ids_to_clear) - else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") + msg_time = msg.get('time') + msg_id = msg.get('message_id') + sender_id = msg.get("user_info", {}).get("user_id") + # 规则:只清理【发送前】收到的、【来自他人】的消息 + if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: + message_ids_to_clear.add(msg_id) + # 如果有需要清理的消息,调用清理方法 + if message_ids_to_clear: + logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}") + await observation_info.clear_processed_messages(message_ids_to_clear) + else: + logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") + + # --- f. 决定下一轮规划类型 --- + # 从 conversation_info 获取【规划期间】收到的【他人】新消息数量 other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) - if other_new_msg_count_during_planning > 0: logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。"); conversation_info.last_successful_reply_action = None - else: logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。"); conversation_info.last_successful_reply_action = action - else: # 发送失败 - logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。"); final_status = "recall"; final_reason = "发送回复时失败"; conversation_info.last_successful_reply_action = None + # 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复 + if other_new_msg_count_during_planning > 0: + logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") + conversation_info.last_successful_reply_action = None # 强制初始回复 + else: + # 规则:如果规划期间【没有】收到他人新消息,则允许追问 + logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") + conversation_info.last_successful_reply_action = action # 允许追问 + + else: + # 如果发送失败 + logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") + final_status = "recall" # 发送失败,标记为 recall + final_reason = "发送回复时失败" + # 重置追问状态 + conversation_info.last_successful_reply_action = None + + # 2. 处理发送告别语动作 elif action == "say_goodbye": - self.state = ConversationState.GENERATING - if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化") - generated_content = await self.reply_generator.generate(observation_info, conversation_info, action_type=action); logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") - if not generated_content or generated_content.startswith("抱歉"): - logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。"); final_reason = "生成内容无效"; final_status = "done"; self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") - else: - self.generated_reply = generated_content; timestamp_before_sending = time.time(); logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}"); self.state = ConversationState.SENDING - send_success = await self._send_reply(); send_end_time = time.time() - if send_success: - action_successful = True; logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") - if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time) - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []); message_ids_to_clear: Set[str] = set() - for msg in current_unprocessed_messages: - msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id") - if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id) - if message_ids_to_clear: await observation_info.clear_processed_messages(message_ids_to_clear) - self.should_continue = False - else: logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送告别语失败。"); final_status = "recall"; final_reason = "发送告别语失败"; self.should_continue = True + self.state = ConversationState.GENERATING + if not self.reply_generator: + raise RuntimeError("ReplyGenerator 未初始化") + # 生成告别语 + generated_content = await self.reply_generator.generate( + observation_info, + conversation_info, + action_type=action + ) + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") - # --- 其他动作处理 (保持不变) --- + # 检查生成内容 + if not generated_content or generated_content.startswith("抱歉"): + logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。") + final_reason = "生成内容无效" + # 即使生成失败,也按计划结束对话 + final_status = "done" + self.should_continue = False + logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") + else: + # 发送告别语 + self.generated_reply = generated_content + timestamp_before_sending = time.time() + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") + self.state = ConversationState.SENDING + send_success = await self._send_reply() + send_end_time = time.time() + + if send_success: + action_successful = True # 标记成功 + logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") + # 更新空闲计时器 + if self.idle_conversation_starter: + await self.idle_conversation_starter.update_last_message_time(send_end_time) + # 清理发送前的消息 (虽然通常是最后一条,但保持逻辑一致) + current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear: Set[str] = set() + for msg in current_unprocessed_messages: + msg_time = msg.get('time') + msg_id = msg.get('message_id') + sender_id = msg.get("user_info", {}).get("user_id") + if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: + message_ids_to_clear.add(msg_id) + if message_ids_to_clear: + await observation_info.clear_processed_messages(message_ids_to_clear) + # 发送成功后结束对话 + self.should_continue = False + else: + # 发送失败 + logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送告别语失败。") + final_status = "recall" + final_reason = "发送告别语失败" + # 发送失败不能结束对话 + self.should_continue = True + + # 3. 处理重新思考目标动作 elif action == "rethink_goal": - self.state = ConversationState.RETHINKING; - if not self.goal_analyzer: - raise RuntimeError("GoalAnalyzer 未初始化"); await self.goal_analyzer.analyze_goal(conversation_info, observation_info); action_successful = True + self.state = ConversationState.RETHINKING + if not self.goal_analyzer: + raise RuntimeError("GoalAnalyzer 未初始化") + # 调用 GoalAnalyzer 分析并更新目标 + await self.goal_analyzer.analyze_goal(conversation_info, observation_info) + action_successful = True # 标记成功 + + # 4. 处理倾听动作 elif action == "listening": - self.state = ConversationState.LISTENING; - if not self.waiter: raise RuntimeError("Waiter 未初始化"); logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态..."); await self.waiter.wait_listening(conversation_info); action_successful = True - elif action == "end_conversation": logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话..."); action_successful = True; self.should_continue = False - elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了..."); ignore_duration_seconds = 10 * 60; self.ignore_until_timestamp = time.time() + ignore_duration_seconds; logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"); self.state = ConversationState.IGNORED; action_successful = True + self.state = ConversationState.LISTENING + if not self.waiter: + raise RuntimeError("Waiter 未初始化") + logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...") + # 调用 Waiter 的倾听等待方法,内部会处理超时 + await self.waiter.wait_listening(conversation_info) + action_successful = True # listening 动作本身执行即视为成功,后续由新消息或超时驱动 + + # 5. 处理结束对话动作 + elif action == "end_conversation": + logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...") + action_successful = True # 标记成功 + self.should_continue = False # 设置标志以退出循环 + + # 6. 处理屏蔽忽略动作 + elif action == "block_and_ignore": + logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...") + ignore_duration_seconds = 10 * 60 # 忽略 10 分钟,可配置 + self.ignore_until_timestamp = time.time() + ignore_duration_seconds + logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}") + self.state = ConversationState.IGNORED # 设置忽略状态 + action_successful = True # 标记成功 + + # 7. 处理等待动作 elif action == "wait": - self.state = ConversationState.WAITING; - if not self.waiter: raise RuntimeError("Waiter 未初始化"); logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态..."); timeout_occurred = await self.waiter.wait(self.conversation_info); action_successful = True; logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。") - else: logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}"); final_status = "recall"; final_reason = f"未知的动作类型: {action}" + self.state = ConversationState.WAITING + if not self.waiter: + raise RuntimeError("Waiter 未初始化") + logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...") + # 调用 Waiter 的常规等待方法,内部处理超时 + timeout_occurred = await self.waiter.wait(self.conversation_info) + action_successful = True # wait 动作本身执行即视为成功 + # wait 动作完成后不需要清理消息,等待新消息或超时触发重新规划 + logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。") + + # 8. 处理未知的动作类型 + else: + logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}") + final_status = "recall" # 未知动作标记为 recall + final_reason = f"未知的动作类型: {action}" # --- 重置非回复动作的追问状态 --- + # 确保执行完非回复动作后,下一次规划不会错误地进入追问逻辑 if action not in ["direct_reply", "send_new_message", "say_goodbye"]: - conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None + conversation_info.last_successful_reply_action = None + # 清理可能残留的拒绝信息 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None except asyncio.CancelledError: - logger.warning(f"[私聊][{self.private_name}] 处理动作 '{action}' 时被取消。") - final_status = "cancelled"; final_reason = "动作处理被取消" - conversation_info.last_successful_reply_action = None - raise + # 处理任务被取消的异常 + logger.warning(f"[私聊][{self.private_name}] 处理动作 '{action}' 时被取消。") + final_status = "cancelled" + final_reason = "动作处理被取消" + # 取消时也重置追问状态 + conversation_info.last_successful_reply_action = None + raise # 重新抛出 CancelledError,让上层知道任务被取消 except Exception as handle_err: + # 捕获处理动作过程中的其他所有异常 logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") - final_status = "error"; final_reason = f"处理动作时出错: {handle_err}" - self.state = ConversationState.ERROR # <--- 使用修正后的 ERROR 状态 + final_status = "error" # 标记为错误状态 + final_reason = f"处理动作时出错: {handle_err}" + self.state = ConversationState.ERROR # 设置对话状态为错误 + # 出错时重置追问状态 conversation_info.last_successful_reply_action = None finally: - # --- 重置临时计数值 --- + # --- 无论成功与否,都执行 --- + + # 1. 重置临时存储的计数值 + # 确保这个值只在当前规划周期内有效 conversation_info.other_new_messages_during_planning_count = 0 - # --- 更新 Action History 状态 (优化) --- + + # 2. 更新动作历史记录的最终状态和原因 + # 优化:如果动作成功但状态仍是默认的 recall,则更新为 done if final_status == "recall" and action_successful: final_status = "done" - if action == "wait": timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False; final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") - elif action == "listening": final_reason = "进入倾听状态" - elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]: final_reason = f"成功执行 {action}" - elif action in ["direct_reply", "send_new_message", "say_goodbye"]: final_reason = "成功发送" - else: final_reason = "动作成功完成" - elif final_status == "recall" and not action_successful and not final_reason.startswith("回复检查不通过"): - if not final_reason or final_reason == "动作未成功执行": final_reason = "动作执行失败或被取消" - # 更新历史记录 + # 根据动作类型设置更具体的成功原因 + if action == "wait": + # 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list) + # 这里简化处理,直接使用通用成功原因 + timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False + final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") + elif action == "listening": + final_reason = "进入倾听状态" + elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]: + final_reason = f"成功执行 {action}" + elif action in ["direct_reply", "send_new_message", "say_goodbye"]: + # 如果是因为发送成功,设置原因 + final_reason = "成功发送" + else: + # 其他未知但标记成功的动作 + final_reason = "动作成功完成" + + elif final_status == "recall" and not action_successful: + # 如果最终是 recall 且未成功,且不是因为检查不通过(比如生成失败),确保原因合理 + if not final_reason or final_reason == "动作未成功执行": + # 排除已经被 checker 设置的原因 + if not conversation_info.last_reply_rejection_reason: + final_reason = "动作执行失败或被取消" # 提供一个更通用的失败原因 + + # 更新历史记录字典 if conversation_info.done_action and action_index < len(conversation_info.done_action): - conversation_info.done_action[action_index].update({"status": final_status, "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": final_reason, "duration_ms": int((time.time() - action_start_time) * 1000)}) - logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") - else: logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。") + # 使用 update 方法更新字典,更安全 + conversation_info.done_action[action_index].update( + { + "status": final_status, # 最终状态 + "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间 + "final_reason": final_reason, # 最终原因 + "duration_ms": int((time.time() - action_start_time) * 1000) # 记录耗时(毫秒) + } + ) + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") + else: + # 如果索引无效或列表为空,记录错误 + logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。") async def _send_reply(self) -> bool: - """发送生成的回复 (保持不变)""" - # (代码同 v6_debug 版本) - if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。"); return False - if not self.direct_sender: logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。"); return False - if not self.chat_stream: logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。"); return False - try: reply_content = self.generated_reply; await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content, reply_to_message=None); self.state = ConversationState.ANALYZING; return True - except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; return False # 使用修正后的 ERROR 状态 + """发送 `self.generated_reply` 中的内容到聊天流""" + # 检查是否有内容可发送 + if not self.generated_reply: + logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。") + return False + # 检查发送器和聊天流是否已初始化 + if not self.direct_sender: + logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。") + return False + if not self.chat_stream: + logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。") + return False + + try: + reply_content = self.generated_reply + # 调用发送器发送消息,不指定回复对象 + await self.direct_sender.send_message( + chat_stream=self.chat_stream, + content=reply_content, + reply_to_message=None # 私聊通常不需要引用回复 + ) + # 发送成功后,将状态设置回分析,准备下一轮规划 + self.state = ConversationState.ANALYZING + return True # 返回成功 + except Exception as e: + # 捕获发送过程中的异常 + logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + self.state = ConversationState.ERROR # 发送失败标记错误状态 + return False # 返回失败 async def _send_timeout_message(self): - """发送超时结束消息""" - # (代码同 v6_debug 版本) - if not self.direct_sender or not self.chat_stream: logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。"); return - try: timeout_content = "我们好像很久没说话了,先这样吧~"; await self.direct_sender.send_message(chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None); logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。"); await self.stop() - except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") - + """在等待超时后发送一条结束消息""" + # 检查发送器和聊天流 + if not self.direct_sender or not self.chat_stream: + logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。") + return + try: + # 定义超时消息内容,可以考虑配置化或由 LLM 生成 + timeout_content = "我们好像很久没说话了,先这样吧~" + # 发送超时消息 + await self.direct_sender.send_message( + chat_stream=self.chat_stream, + content=timeout_content, + reply_to_message=None + ) + logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。") + # 发送超时消息后,通常意味着对话结束,调用 stop + await self.stop() + except Exception as e: + # 捕获发送超时消息的异常 + logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") \ No newline at end of file diff --git a/src/plugins/PFC/idle_conversation_starter.py b/src/plugins/PFC/idle_conversation_starter.py index 7e001d2f..13f52c03 100644 --- a/src/plugins/PFC/idle_conversation_starter.py +++ b/src/plugins/PFC/idle_conversation_starter.py @@ -1,351 +1,193 @@ -from typing import List, Tuple, TYPE_CHECKING, Optional, Union import asyncio import time import random +import traceback +from typing import List, TYPE_CHECKING, Optional, Dict, Any + from src.common.logger import get_module_logger from ..models.utils_model import LLMRequest from ...config.config import global_config from .chat_observer import ChatObserver from .message_sender import DirectMessageSender from ..chat.chat_stream import ChatStream -from maim_message import UserInfo from src.individuality.individuality import Individuality from src.plugins.utils.chat_message_builder import build_readable_messages if TYPE_CHECKING: from ..chat.message import Message + # 仍然需要类型提示 from .conversation import Conversation + from .pfc_manager import PFCManager # 仅用于类型提示 -logger = get_module_logger("pfc_idle") +logger = get_module_logger("idle_conversation") class IdleConversationStarter: - """长时间无对话主动发起对话的组件 - - 该组件会在一段时间没有对话后,自动生成一条消息发送给用户,以保持对话的活跃度。 - 时间阈值会在配置的最小和最大值之间随机选择,每次发送消息后都会重置。 - """ - + """长时间无对话主动发起对话的组件""" + def __init__(self, stream_id: str, private_name: str): - """初始化空闲对话启动器 - - Args: - stream_id: 聊天流ID - private_name: 私聊用户名称 - """ + """初始化空闲对话启动器""" self.stream_id: str = stream_id self.private_name: str = private_name self.chat_observer = ChatObserver.get_instance(stream_id, private_name) self.message_sender = DirectMessageSender(private_name) - - # 添加异步锁,保护对共享变量的访问 self._lock: asyncio.Lock = asyncio.Lock() - - # LLM请求对象,用于生成主动对话内容 self.llm = LLMRequest( model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter" ) - - # 个性化信息 self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3) self.name: str = global_config.BOT_NICKNAME - self.nick_name: List[str] = global_config.BOT_ALIAS_NAMES - - # 从配置文件读取配置参数,或使用默认值 - self.enabled: bool = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True) - self.idle_check_interval: int = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10) - self.min_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 60) - self.max_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 120) - - # 计算实际触发阈值(在min和max之间随机) - self.actual_idle_threshold: int = random.randint(self.min_idle_time, self.max_idle_time) - - # 工作状态 + idle_config = getattr(global_config, 'idle_conversation', {}) + self.enabled: bool = idle_config.get('enable_idle_conversation', True) + self.idle_check_interval: int = idle_config.get('idle_check_interval', 10) + self.min_idle_time: int = idle_config.get('min_idle_time', 60) + self.max_idle_time: int = idle_config.get('max_idle_time', 120) self.last_message_time: float = time.time() + self.actual_idle_threshold: int = self._get_new_threshold() self._running: bool = False self._task: Optional[asyncio.Task] = None - + + def _get_new_threshold(self) -> int: + """计算一个新的随机空闲阈值""" + try: + min_t = max(10, self.min_idle_time); max_t = max(min_t, self.max_idle_time) + return random.randint(min_t, max_t) + except ValueError: + logger.warning(f"[私聊][{self.private_name}] idle_time 配置无效 ({self.min_idle_time}, {self.max_idle_time}),使用默认值 60-120。") + return random.randint(60, 120) + def start(self) -> None: - """启动空闲对话检测 - - 如果功能被禁用或已经在运行,则不会启动。 - """ - # 如果功能被禁用,则不启动 - if not self.enabled: - logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用") - return - - if self._running: - logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中") - return - + """启动空闲对话检测""" + if not self.enabled: logger.info(f"[私聊][{self.private_name}] 主动发起对话功能已禁用"); return + if self._running: logger.debug(f"[私聊][{self.private_name}] 主动发起对话功能已在运行中"); return self._running = True self._task = asyncio.create_task(self._check_idle_loop()) - logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}秒") - + logger.info(f"[私聊][{self.private_name}] 启动空闲对话检测,当前阈值: {self.actual_idle_threshold}秒") + def stop(self) -> None: - """停止空闲对话检测 - - 取消当前运行的任务并重置状态。 - """ - if not self._running: - return - + """停止空闲对话检测""" + if not self._running: return self._running = False - if self._task: - self._task.cancel() - self._task = None - logger.info(f"[私聊][{self.private_name}]停止空闲对话检测") - + if self._task: self._task.cancel(); self._task = None + logger.info(f"[私聊][{self.private_name}] 停止空闲对话检测") + async def update_last_message_time(self, message_time: Optional[float] = None) -> None: - """更新最后一条消息的时间 - - Args: - message_time: 消息时间戳,如果为None则使用当前时间 - """ + """更新最后一条消息的时间,并重置阈值""" async with self._lock: - self.last_message_time = message_time or time.time() - # 重新随机化下一次触发的时间阈值 - self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time) - logger.debug(f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}秒") - + new_time = message_time or time.time() + if new_time > self.last_message_time: + self.last_message_time = new_time + self.actual_idle_threshold = self._get_new_threshold() + logger.debug(f"[私聊][{self.private_name}] 更新最后消息时间: {self.last_message_time:.2f},新阈值: {self.actual_idle_threshold}秒") + def reload_config(self) -> None: - """重新加载配置 - - 从配置文件重新读取所有参数,以便动态调整空闲对话检测的行为。 - """ + """重新加载配置""" try: - # 从配置文件重新读取参数 - self.enabled = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True) - self.idle_check_interval = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10) - self.min_idle_time = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 7200) - self.max_idle_time = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 18000) - - logger.debug(f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 最短间隔={self.min_idle_time}秒, 最长间隔={self.max_idle_time}秒") - - # 重新计算实际阈值 - async def update_threshold(): + idle_config = getattr(global_config, 'idle_conversation', {}) + self.enabled = idle_config.get('enable_idle_conversation', True) + self.idle_check_interval = idle_config.get('idle_check_interval', 10) + self.min_idle_time = idle_config.get('min_idle_time', 60) + self.max_idle_time = idle_config.get('max_idle_time', 120) + logger.debug(f"[私聊][{self.private_name}] 重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 阈值范围=[{self.min_idle_time}, {self.max_idle_time}]秒") + async def update_threshold_async(): async with self._lock: - self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time) - logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}秒") - - # 创建一个任务来异步更新阈值 - asyncio.create_task(update_threshold()) - - except Exception as e: - logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}") - + self.actual_idle_threshold = self._get_new_threshold() + logger.debug(f"[私聊][{self.private_name}] 配置重载后更新空闲检测阈值为: {self.actual_idle_threshold}秒") + asyncio.create_task(update_threshold_async()) + if self.enabled and not self._running: self.start() + elif not self.enabled and self._running: self.stop() + except Exception as e: logger.error(f"[私聊][{self.private_name}] 重新加载配置时出错: {str(e)}") + async def _check_idle_loop(self) -> None: - """检查空闲状态的循环 - - 定期检查是否长时间无对话,如果达到阈值则尝试主动发起对话。 - """ + """检查空闲状态的循环""" try: - config_reload_counter = 0 - config_reload_interval = 100 # 每100次检查重新加载一次配置 - + config_reload_counter = 0; config_reload_interval = 60 while self._running: - # 定期重新加载配置 - config_reload_counter += 1 - if config_reload_counter >= config_reload_interval: - self.reload_config() - config_reload_counter = 0 - - # 检查是否启用了主动对话功能 - if not self.enabled: - # 如果禁用了功能,就等待一段时间后再次检查配置 - await asyncio.sleep(self.idle_check_interval) - continue - - # 使用锁保护对共享变量的读取 - current_time = time.time() - async with self._lock: - idle_time = current_time - self.last_message_time - threshold = self.actual_idle_threshold - - if idle_time >= threshold: - logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天") - await self._initiate_conversation() - # 更新时间,避免连续触发 - await self.update_last_message_time() - - # 等待下一次检查 await asyncio.sleep(self.idle_check_interval) - - except asyncio.CancelledError: - logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消") - except Exception as e: - logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}") - # 尝试重新启动检测循环 - if self._running: - logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测") - self._task = asyncio.create_task(self._check_idle_loop()) - - async def _initiate_conversation(self) -> None: - """生成并发送主动对话内容 - - 获取聊天历史记录,使用LLM生成合适的开场白,然后发送消息。 - """ - try: - # 获取聊天历史记录,用于生成更合适的开场白 - messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息 - chat_history_text = await build_readable_messages( - messages, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, - ) - - # 构建提示词 - prompt = f"""{self.personality_info}。你的名字是{self.name}。 - 你正在与用户{self.private_name}进行QQ私聊, - 但已经有一段时间没有对话了。 - 你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。 - 请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。 - 这条消息应该能够引起用户的兴趣,重新开始对话。 - 最近的对话历史(可能已经过去了很久): - {chat_history_text} - 请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。 - 消息内容尽量简短,不要超过20个字,不要添加任何表情符号。 - """ - - # 尝试生成回复,添加超时处理 - try: - content, _ = await asyncio.wait_for( - self.llm.generate_response_async(prompt), - timeout=30 # 30秒超时 - ) - except asyncio.TimeoutError: - logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时") - return - except Exception as llm_err: - logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}") - return - - # 清理结果 - content = content.strip() - content = content.strip('"\'') - - if not content: - logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空") - return - - # 统一错误处理,从这里开始所有操作都在同一个try-except块中 - logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送") - - from .pfc_manager import PFCManager - from src.plugins.chat.chat_stream import chat_manager - - # 获取当前实例 - pfc_manager = PFCManager.get_instance() - - # 结束当前对话实例(如果存在) - current_conversation = await pfc_manager.get_conversation(self.stream_id) - if current_conversation: - logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例") - try: - await current_conversation.stop() - await pfc_manager.remove_conversation(self.stream_id) - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例") - - # 创建新的对话实例 - logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息") - new_conversation = None - try: - new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}") - return - - # 确保新对话实例已初始化完成 - chat_stream = await self._get_chat_stream(new_conversation) - if not chat_stream: - logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息") - return - - # 发送消息 - try: - await self.message_sender.send_message( - chat_stream=chat_stream, - content=content, - reply_to_message=None - ) - - # 更新空闲会话启动器的最后消息时间 - await self.update_last_message_time() - - # 如果新对话实例有一个聊天观察者,请触发更新 - if new_conversation and hasattr(new_conversation, 'chat_observer'): - logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新") + config_reload_counter = (config_reload_counter + 1) % config_reload_interval + if config_reload_counter == 0: self.reload_config() + if not self.enabled: continue + current_time = time.time() + async with self._lock: last_msg_time = self.last_message_time; threshold = self.actual_idle_threshold + idle_time = current_time - last_msg_time + if idle_time >= threshold: + logger.info(f"[私聊][{self.private_name}] 检测到长时间({idle_time:.0f}秒 >= {threshold}秒)无对话,尝试主动发起聊天") try: - new_conversation.chat_observer.trigger_update() - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}") - - logger.success(f"[私聊][{self.private_name}]成功主动发起对话: {content}") - except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}") - - except Exception as e: - # 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃 - logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}") - - async def _get_chat_stream(self, conversation: Optional['Conversation'] = None) -> Optional[ChatStream]: - """获取可用的聊天流 - - 尝试多种方式获取聊天流: - 1. 从传入的对话实例中获取 - 2. 从全局聊天管理器中获取 - 3. 创建一个新的聊天流 - - Args: - conversation: 对话实例,可以为None - - Returns: - Optional[ChatStream]: 如果成功获取则返回聊天流,否则返回None - """ - chat_stream = None - - # 1. 尝试从对话实例获取 - if conversation and hasattr(conversation, 'should_continue'): - # 等待一小段时间,确保初始化完成 - retry_count = 0 - max_retries = 10 - while not conversation.should_continue and retry_count < max_retries: - await asyncio.sleep(0.5) - retry_count += 1 - logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}") - - if not conversation.should_continue: - logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流") - - # 尝试使用对话实例的聊天流 - if hasattr(conversation, 'chat_stream') and conversation.chat_stream: - logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流") - return conversation.chat_stream - - # 2. 尝试从聊天管理器获取 - from src.plugins.chat.chat_stream import chat_manager + await self._initiate_conversation() + await self.update_last_message_time() # 主动发起后立即更新时间 + except Exception as initiate_err: + logger.error(f"[私聊][{self.private_name}] 主动发起对话过程中出错: {initiate_err}\n{traceback.format_exc()}") + await self.update_last_message_time() # 即使出错也更新时间 + except asyncio.CancelledError: logger.debug(f"[私聊][{self.private_name}] 空闲对话检测任务被取消") + except Exception as e: logger.error(f"[私聊][{self.private_name}] 空闲对话检测循环发生未预期错误: {str(e)}\n{traceback.format_exc()}") + + async def _initiate_conversation(self) -> None: + """生成并发送主动对话内容 (修复循环导入)""" + # --- 将 PFCManager 的导入移到这里 --- + from .pfc_manager import PFCManager + # 仅在需要时导入类型提示,避免运行时错误 + if TYPE_CHECKING: + from .conversation import Conversation + + pfc_manager: 'PFCManager' = PFCManager.get_instance() # 获取管理器实例 + conversation: Optional['Conversation'] = None + chat_stream: Optional[ChatStream] = None + try: - logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流") - chat_stream = chat_manager.get_stream(self.stream_id) - if chat_stream: - return chat_stream + # --- 1. 获取或创建 Conversation 实例 --- + logger.debug(f"[私聊][{self.private_name}] 尝试获取或创建 Conversation 实例...") + conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name) + if not conversation or not conversation._initialized: + logger.error(f"[私聊][{self.private_name}] 无法获取或创建有效的 Conversation 实例,取消主动发起对话。") + return + chat_stream = conversation.chat_stream + if not chat_stream: + logger.error(f"[私聊][{self.private_name}] 无法从 Conversation 实例获取 ChatStream,取消主动发起对话。") + return + + # --- 2. 获取聊天历史用于生成 --- + messages: List[Dict[str, Any]] = [] + chat_history_text = "最近没有聊天记录。" + if conversation.observation_info: + messages = conversation.observation_info.chat_history[-12:] + if messages: + chat_history_text = await build_readable_messages(messages, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) + else: logger.warning(f"[私聊][{self.private_name}] Conversation 实例缺少 ObservationInfo,无法获取准确聊天记录。") + + # --- 3. 构建 Prompt 并生成内容 --- + prompt = f"""{self.personality_info}。你的名字是{self.name}。 +你正在与用户 {self.private_name} 进行QQ私聊, 但已经有一段时间没有对话了。 +你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。 +请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。 +这条消息应该能够引起用户的兴趣,重新开始对话。 + +最近的对话历史(可能已经过去了很久): +{chat_history_text} + +请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。 +消息内容尽量简短,不要超过30个字。可以适当添加符合你人设的语气词或表情符号(如果合适)。 +""" + # logger.debug(f"[私聊][{self.private_name}] 发送到 LLM 的主动对话 Prompt: \n{prompt}") # 日志可能过长 + content = "" + try: + llm_response, _ = await asyncio.wait_for(self.llm.generate_response_async(prompt), timeout=30) + content = llm_response.strip().strip('"\'') + logger.debug(f"[私聊][{self.private_name}] LLM 生成的主动对话内容: {content}") + except asyncio.TimeoutError: logger.error(f"[私聊][{self.private_name}] 生成主动对话内容超时"); return + except Exception as llm_err: logger.error(f"[私聊][{self.private_name}] 生成主动对话内容失败: {str(llm_err)}"); return + if not content: logger.error(f"[私聊][{self.private_name}] 生成的主动对话内容为空"); return + + # --- 4. 发送消息 --- + logger.info(f"[私聊][{self.private_name}] 准备发送主动对话消息: {content}") + await self.message_sender.send_message(chat_stream=chat_stream, content=content, reply_to_message=None) + + # --- 5. 发送成功后的处理 --- + logger.success(f"[私聊][{self.private_name}] 成功主动发起对话: {content}") + # 更新时间戳的操作移至外层 _check_idle_loop + # 触发 ChatObserver 更新 + if conversation.chat_observer: + logger.debug(f"[私聊][{self.private_name}] 触发 ChatObserver 更新...") + conversation.chat_observer.trigger_update() + else: logger.warning(f"[私聊][{self.private_name}] Conversation 实例缺少 ChatObserver,无法触发更新。") + except Exception as e: - logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}") - - # 3. 创建新的聊天流 - try: - logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流") - # 创建用户信息对象 - user_info = UserInfo( - user_id=global_config.BOT_QQ, - user_nickname=global_config.BOT_NICKNAME, - platform="qq" - ) - # 创建聊天流 - return ChatStream(self.stream_id, "qq", user_info) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}") - return None \ No newline at end of file + logger.error(f"[私聊][{self.private_name}] 主动发起对话的整体流程出错: {str(e)}\n{traceback.format_exc()}") \ No newline at end of file diff --git a/src/plugins/PFC/pfc_manager.py b/src/plugins/PFC/pfc_manager.py index 718f621f..d008bab3 100644 --- a/src/plugins/PFC/pfc_manager.py +++ b/src/plugins/PFC/pfc_manager.py @@ -36,11 +36,11 @@ class PFCManager: await asyncio.sleep(0.5) # 短暂等待,让初始化有机会完成 # 再次检查实例是否存在 if stream_id in self._instances and self._instances[stream_id]._initialized: - logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}") - return self._instances[stream_id] + logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}") + return self._instances[stream_id] else: - logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。") - return None # 避免返回未完成的实例 + logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。") + return None # 避免返回未完成的实例 # 检查是否已有活动实例 if stream_id in self._instances: @@ -61,8 +61,10 @@ class PFCManager: logger.warning(f"[私聊][{private_name}] 发现无效或已停止的旧实例,清理并重新创建: {stream_id}") await self._cleanup_conversation(instance) # 从字典中移除,确保下面能创建新的 - if stream_id in self._instances: del self._instances[stream_id] - if stream_id in self._initializing: del self._initializing[stream_id] + if stream_id in self._instances: + del self._instances[stream_id] + if stream_id in self._initializing: + del self._initializing[stream_id] # --- 创建并初始化新实例 --- @@ -88,7 +90,8 @@ class PFCManager: logger.error(f"[私聊][{private_name}] 初始化未成功完成,无法启动实例 {stream_id}。") # 清理可能部分创建的实例 await self._cleanup_conversation(conversation_instance) - if stream_id in self._instances: del self._instances[stream_id] + if stream_id in self._instances: + del self._instances[stream_id] conversation_instance = None # 返回 None 表示失败 except Exception as e: @@ -97,13 +100,14 @@ class PFCManager: # 确保清理 if conversation_instance: await self._cleanup_conversation(conversation_instance) - if stream_id in self._instances: del self._instances[stream_id] + if stream_id in self._instances: + del self._instances[stream_id] conversation_instance = None # 返回 None finally: # 确保初始化标记被清除 - if stream_id in self._initializing: - self._initializing[stream_id] = False + if stream_id in self._initializing: + self._initializing[stream_id] = False return conversation_instance @@ -116,9 +120,9 @@ class PFCManager: await conversation._initialize() # 调用实例自身的初始化方法 # 注意:初始化成功与否由 conversation._initialized 和 conversation.should_continue 标志决定 if conversation._initialized: - logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}") + logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}") else: - logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}") + logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}") except Exception as e: # _initialize 内部应该处理自己的异常,但这里也捕获以防万一 @@ -131,7 +135,8 @@ class PFCManager: async def _cleanup_conversation(self, conversation: Conversation): """清理会话实例的资源""" - if not conversation: return + if not conversation: + return stream_id = conversation.stream_id private_name = conversation.private_name logger.info(f"[私聊][{private_name}] 开始清理会话实例资源: {stream_id}") @@ -140,13 +145,13 @@ class PFCManager: if hasattr(conversation, 'stop') and callable(conversation.stop): await conversation.stop() # stop 方法应处理内部组件的停止 else: - logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。") - # 尝试手动停止已知组件 (作为后备) - if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter: - conversation.idle_conversation_starter.stop() - if hasattr(conversation, 'observation_info') and conversation.observation_info: - conversation.observation_info.unbind_from_chat_observer() - # ChatObserver 是单例,不在此处停止 + logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。") + # 尝试手动停止已知组件 (作为后备) + if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter: + conversation.idle_conversation_starter.stop() + if hasattr(conversation, 'observation_info') and conversation.observation_info: + conversation.observation_info.unbind_from_chat_observer() + # ChatObserver 是单例,不在此处停止 logger.info(f"[私聊][{private_name}] 会话实例 {stream_id} 资源已清理") except Exception as e: @@ -157,12 +162,12 @@ class PFCManager: """获取已存在的会话实例 (只读)""" instance = self._instances.get(stream_id) if instance and instance._initialized and instance.should_continue: - # 检查忽略状态 - if (hasattr(instance, "ignore_until_timestamp") and - instance.ignore_until_timestamp and - time.time() < instance.ignore_until_timestamp): - return None # 忽略期间不返回 - return instance + # 检查忽略状态 + if (hasattr(instance, "ignore_until_timestamp") and + instance.ignore_until_timestamp and + time.time() < instance.ignore_until_timestamp): + return None # 忽略期间不返回 + return instance return None # 不存在或无效则返回 None async def remove_conversation(self, stream_id: str): @@ -173,7 +178,8 @@ class PFCManager: try: # 先从字典中移除引用,防止新的请求获取到正在清理的实例 del self._instances[stream_id] - if stream_id in self._initializing: del self._initializing[stream_id] + if stream_id in self._initializing: + del self._initializing[stream_id] # 清理资源 await self._cleanup_conversation(instance_to_remove) logger.info(f"[管理器] 会话实例 {stream_id} 已成功移除并清理")