diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index faf13bd0..854d734c 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -1,24 +1,25 @@ +# -*- coding: utf-8 -*- +# File: action_planner.py import time -from typing import Tuple, Optional -from .pfc_utils import retrieve_contextual_info +import traceback +from typing import Tuple, Optional, Dict, Any, List -# import jieba # 如果需要旧版知识库的回退,可能需要 -# import re # 如果需要旧版知识库的回退,可能需要 from src.common.logger_manager import get_logger +from src.individuality.individuality import Individuality +from src.plugins.utils.chat_message_builder import build_readable_messages from ..models.utils_model import LLMRequest from ...config.config import global_config + +# 确保导入路径正确 +from .pfc_utils import get_items_from_json, retrieve_contextual_info from .chat_observer import ChatObserver -from .pfc_utils import get_items_from_json -from src.individuality.individuality import Individuality from .observation_info import ObservationInfo from .conversation_info import ConversationInfo -from src.plugins.utils.chat_message_builder import build_readable_messages - logger = get_logger("pfc_action_planner") -# --- 定义 Prompt 模板 --- +# --- 定义 Prompt 模板 --- # Prompt(1): 首次回复或非连续回复时的决策 Prompt PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊,请根据以下【所有信息】审慎且灵活的决策下一步行动,可以回复,可以倾听,可以调取知识,甚至可以屏蔽对方: @@ -41,7 +42,7 @@ PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊, ------ 可选行动类型以及解释: listening: 倾听对方发言,当你认为对方话才说到一半,发言明显未结束时选择 -direct_reply: 直接回复对方 +direct_reply: 直接回复对方 (当有新消息需要处理时,通常应选择此项) rethink_goal: 思考一个对话目标,当你觉得目前对话需要目标,或当前目标不再适用,或话题卡住时选择。注意私聊的环境是灵活的,有可能需要经常选择 end_conversation: 结束对话,对方长时间没回复或者当你觉得对话告一段落时可以选择 block_and_ignore: 更加极端的结束对话方式,直接结束对话并在一段时间内无视对方所有发言(屏蔽),当对话让你感到十分不适,或你遭到各类骚扰时选择 @@ -73,7 +74,7 @@ PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊,刚刚 {retrieved_memory_str} ------ 可选行动类型以及解释: -wait: 暂时不说话,留给对方交互空间,等待对方回复(尤其是在你刚发言后、或上次发言因重复、发言过多被拒时、或不确定做什么时,这是不错的选择) +wait: 暂时不说话,留给对方交互空间,等待对方回复(尤其是在你刚发言后、或上次发言因重复、发言过多被拒时、或不确定做什么时,这是不错的选择)。**重要:仅当没有未读消息时才能选择此项。** listening: 倾听对方发言(虽然你刚发过言,但如果对方立刻回复且明显话没说完,可以选择这个) send_new_message: 发送一条新消息继续对话,允许适当的追问、补充、深入话题,或开启相关新话题。**但是避免在因重复被拒后立即使用,也不要在对方没有回复的情况下过多的“消息轰炸”或重复发言** rethink_goal: 思考一个对话目标,当你觉得目前对话需要目标,或当前目标不再适用,或话题卡住时选择。注意私聊的环境是灵活的,有可能需要经常选择 @@ -107,274 +108,114 @@ PROMPT_END_DECISION = """{persona_text}。刚刚你决定结束一场 QQ 私聊 注意:请严格按照 JSON 格式输出,不要包含任何其他内容。""" -# ActionPlanner 类定义,顶格 class ActionPlanner: """行动规划器""" def __init__(self, stream_id: str, private_name: str): - self.llm = LLMRequest( - model=global_config.llm_PFC_action_planner, - temperature=global_config.llm_PFC_action_planner["temp"], - max_tokens=1500, - request_type="action_planning", - ) + """初始化行动规划器""" + self.stream_id = stream_id + self.private_name = private_name + # 初始化 LLM 请求对象 + try: + llm_config = global_config.llm_PFC_action_planner + if not isinstance(llm_config, dict): + raise TypeError(f"LLM config 'llm_PFC_action_planner' is not a dictionary: {llm_config}") + + self.llm = LLMRequest( + model=llm_config, + temperature=llm_config.get("temp", 0.7), + max_tokens=1500, + request_type="action_planning", + ) + except TypeError as e: + logger.error(f"[私聊][{self.private_name}] 初始化 LLMRequest 时配置错误: {e}") + raise + except Exception as e: + logger.error(f"[私聊][{self.private_name}] 初始化 LLMRequest 时发生未知错误: {e}") + raise + + # 获取个性化信息和机器人名称 self.personality_info = Individuality.get_instance().get_prompt(x_person=2, level=3) self.name = global_config.BOT_NICKNAME - self.private_name = private_name + # 获取 ChatObserver 实例 (单例模式) self.chat_observer = ChatObserver.get_instance(stream_id, private_name) - # 修改 plan 方法签名,增加 last_successful_reply_action 参数 + async def plan( self, observation_info: ObservationInfo, conversation_info: ConversationInfo, last_successful_reply_action: Optional[str], ) -> Tuple[str, str]: - """规划下一步行动 + """ + 规划下一步行动。 Args: - observation_info: 决策信息 - conversation_info: 对话信息 - last_successful_reply_action: 上一次成功的回复动作类型 ('direct_reply' 或 'send_new_message' 或 None) + observation_info: 观察信息,包含聊天记录、未读消息等。 + conversation_info: 对话信息,包含目标、历史动作等。 + last_successful_reply_action: 上一次成功的回复动作类型 ('direct_reply' 或 'send_new_message' 或 None)。 Returns: - Tuple[str, str]: (行动类型, 行动原因) + Tuple[str, str]: (规划的行动类型, 行动原因)。 """ - # --- 获取 Bot 上次发言时间信息 --- - # (这部分逻辑不变) - time_since_last_bot_message_info = "" + logger.info(f"[私聊][{self.private_name}] 开始规划行动...") + plan_start_time = time.time() + + # --- 1. 准备 Prompt 输入信息 --- try: - bot_id = str(global_config.BOT_QQ) - if hasattr(observation_info, "chat_history") and observation_info.chat_history: - for i in range(len(observation_info.chat_history) - 1, -1, -1): - msg = observation_info.chat_history[i] - if not isinstance(msg, dict): - continue - sender_info = msg.get("user_info", {}) - sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None - msg_time = msg.get("time") - if sender_id == bot_id and msg_time: - time_diff = time.time() - msg_time - if time_diff < 60.0: - time_since_last_bot_message_info = ( - f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n" - ) - break - else: - logger.debug( - f"[私聊][{self.private_name}]Observation info chat history is empty or not available for bot time check." - ) - except AttributeError: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo object might not have chat_history attribute yet for bot time check." + time_since_last_bot_message_info = self._get_bot_last_speak_time_info(observation_info) + timeout_context = self._get_timeout_context(conversation_info) + goals_str = self._build_goals_string(conversation_info) + chat_history_text = await self._build_chat_history_text(observation_info) + persona_text = f"你的名字是{self.name},{self.personality_info}。" + action_history_summary, last_action_context = self._build_action_history_context(conversation_info) + retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info( + chat_history_text, self.private_name ) - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]获取 Bot 上次发言时间时出错: {e}") - - # --- 获取超时提示信息 --- - # (这部分逻辑不变) - timeout_context = "" - try: - if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: - last_goal_dict = conversation_info.goal_list[-1] - if isinstance(last_goal_dict, dict) and "goal" in last_goal_dict: - last_goal_text = last_goal_dict["goal"] - if isinstance(last_goal_text, str) and "分钟,思考接下来要做什么" in last_goal_text: - try: - timeout_minutes_text = last_goal_text.split(",")[0].replace("你等待了", "") - timeout_context = f"重要提示:对方已经长时间({timeout_minutes_text})没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n" - except Exception: - timeout_context = "重要提示:对方已经长时间没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n" - else: - logger.debug( - f"[私聊][{self.private_name}]Conversation info goal_list is empty or not available for timeout check." - ) - except AttributeError: - logger.warning( - f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet for timeout check." + logger.info( + f"[私聊][{self.private_name}] (ActionPlanner) 检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str and '出错' not in retrieved_knowledge_str else '无'}" ) - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]检查超时目标时出错: {e}") + except Exception as prep_err: + logger.error(f"[私聊][{self.private_name}] 准备 Prompt 输入时出错: {prep_err}") + logger.error(traceback.format_exc()) + return "wait", f"准备行动规划输入时出错: {prep_err}" - # --- 构建通用 Prompt 参数 --- - logger.debug( - f"[私聊][{self.private_name}]开始规划行动:当前目标: {getattr(conversation_info, 'goal_list', '不可用')}" - ) - - # 构建对话目标 (goals_str) - goals_str = "" + # --- 2. 选择并格式化 Prompt --- try: - if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: - for goal_reason in conversation_info.goal_list: - if isinstance(goal_reason, dict): - goal = goal_reason.get("goal", "目标内容缺失") - reasoning = goal_reason.get("reasoning", "没有明确原因") - else: - goal = str(goal_reason) - reasoning = "没有明确原因" - - goal = str(goal) if goal is not None else "目标内容缺失" - reasoning = str(reasoning) if reasoning is not None else "没有明确原因" - goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n" - - if not goals_str: - goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n" + if last_successful_reply_action in ["direct_reply", "send_new_message"]: + prompt_template = PROMPT_FOLLOW_UP + log_msg = "使用 PROMPT_FOLLOW_UP (追问决策)" else: - goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n" - except AttributeError: - logger.warning( - f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet." + prompt_template = PROMPT_INITIAL_REPLY + log_msg = "使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)" + logger.debug(f"[私聊][{self.private_name}] {log_msg}") + + prompt = prompt_template.format( + persona_text=persona_text, + goals_str=goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。", + action_history_summary=action_history_summary, + last_action_context=last_action_context, + time_since_last_bot_message_info=time_since_last_bot_message_info, + timeout_context=timeout_context, + chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。", + retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", + retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。", ) - goals_str = "- 获取对话目标时出错。\n" - except Exception as e: - logger.error(f"[私聊][{self.private_name}]构建对话目标字符串时出错: {e}") - goals_str = "- 构建对话目标时出错。\n" + logger.debug(f"[私聊][{self.private_name}] 发送到LLM的最终提示词:\n------\n{prompt}\n------") + except KeyError as fmt_key_err: + logger.error(f"[私聊][{self.private_name}] 格式化 Prompt 时缺少键: {fmt_key_err}") + return "wait", f"格式化 Prompt 时出错 (缺少键: {fmt_key_err})" + except Exception as fmt_err: + logger.error(f"[私聊][{self.private_name}] 格式化 Prompt 时发生未知错误: {fmt_err}") + return "wait", f"格式化 Prompt 时出错: {fmt_err}" - # 获取聊天历史记录 (chat_history_text) - try: - if hasattr(observation_info, "chat_history") and observation_info.chat_history: - chat_history_text = observation_info.chat_history_str - if not chat_history_text: - chat_history_text = "还没有聊天记录。\n" - else: - chat_history_text = "还没有聊天记录。\n" - - if hasattr(observation_info, "new_messages_count") and observation_info.new_messages_count > 0: - if hasattr(observation_info, "unprocessed_messages") and observation_info.unprocessed_messages: - new_messages_list = observation_info.unprocessed_messages - new_messages_str = await build_readable_messages( - new_messages_list, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, - ) - chat_history_text += ( - f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}" - ) - else: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo has new_messages_count > 0 but unprocessed_messages is empty or missing." - ) - except AttributeError: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo object might be missing expected attributes for chat history." - ) - chat_history_text = "获取聊天记录时出错。\n" - except Exception as e: - logger.error(f"[私聊][{self.private_name}]处理聊天记录时发生未知错误: {e}") - chat_history_text = "处理聊天记录时出错。\n" - - # 构建 Persona 文本 (persona_text) - persona_text = f"你的名字是{self.name},{self.personality_info}。" - - # 构建行动历史和上一次行动结果 (action_history_summary, last_action_context) - # (这部分逻辑不变) - action_history_summary = "你最近执行的行动历史:\n" - last_action_context = "关于你【上一次尝试】的行动:\n" - action_history_list = [] - try: - if hasattr(conversation_info, "done_action") and conversation_info.done_action: - action_history_list = conversation_info.done_action[-5:] - else: - logger.debug(f"[私聊][{self.private_name}]Conversation info done_action is empty or not available.") - except AttributeError: - logger.warning( - f"[私聊][{self.private_name}]ConversationInfo object might not have done_action attribute yet." - ) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]访问行动历史时出错: {e}") - - if not action_history_list: - action_history_summary += "- 还没有执行过行动。\n" - last_action_context += "- 这是你规划的第一个行动。\n" - else: - for i, action_data in enumerate(action_history_list): - action_type = "未知" - plan_reason = "未知" - status = "未知" - final_reason = "" - action_time = "" - - if isinstance(action_data, dict): - action_type = action_data.get("action", "未知") - plan_reason = action_data.get("plan_reason", "未知规划原因") - status = action_data.get("status", "未知") - final_reason = action_data.get("final_reason", "") - action_time = action_data.get("time", "") - elif isinstance(action_data, tuple): - # 假设旧格式兼容 - if len(action_data) > 0: - action_type = action_data[0] - if len(action_data) > 1: - plan_reason = action_data[1] # 可能是规划原因或最终原因 - if len(action_data) > 2: - status = action_data[2] - if status == "recall" and len(action_data) > 3: - final_reason = action_data[3] - elif status == "done" and action_type in ["direct_reply", "send_new_message"]: - plan_reason = "成功发送" # 简化显示 - - reason_text = f", 失败/取消原因: {final_reason}" if final_reason else "" - summary_line = f"- 时间:{action_time}, 尝试行动:'{action_type}', 状态:{status}{reason_text}" - action_history_summary += summary_line + "\n" - - if i == len(action_history_list) - 1: - last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n" - last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n" - if status == "done": - last_action_context += "- 该行动已【成功执行】。\n" - # 记录这次成功的行动类型,供下次决策 - # self.last_successful_action_type = action_type # 不在这里记录,由 conversation 控制 - elif status == "recall": - last_action_context += "- 但该行动最终【未能执行/被取消】。\n" - if final_reason: - last_action_context += f"- 【重要】失败/取消的具体原因是: “{final_reason}”\n" - else: - last_action_context += "- 【重要】失败/取消原因未明确记录。\n" - # self.last_successful_action_type = None # 行动失败,清除记录 - else: - last_action_context += f"- 该行动当前状态: {status}\n" - # self.last_successful_action_type = None # 非完成状态,清除记录 - - retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info( - chat_history_text, self.private_name - ) - # Optional: 可以加一行日志确认结果,方便调试 - logger.info( - f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}" - ) - - # --- 选择 Prompt --- - if last_successful_reply_action in ["direct_reply", "send_new_message"]: - prompt_template = PROMPT_FOLLOW_UP - logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_FOLLOW_UP (追问决策)") - else: - prompt_template = PROMPT_INITIAL_REPLY - logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)") - - # --- 格式化最终的 Prompt --- - prompt = prompt_template.format( - persona_text=persona_text, - goals_str=goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。", - action_history_summary=action_history_summary, - last_action_context=last_action_context, - time_since_last_bot_message_info=time_since_last_bot_message_info, - timeout_context=timeout_context, - chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。", - # knowledge_info_str=knowledge_info_str, # 移除了旧知识展示方式 - retrieved_memory_str=retrieved_memory_str_planner if retrieved_memory_str_planner else "无相关记忆。", - retrieved_knowledge_str=retrieved_knowledge_str_planner - if retrieved_knowledge_str_planner - else "无相关知识。", - ) - - logger.debug(f"[私聊][{self.private_name}]发送到LLM的最终提示词:\n------\n{prompt}\n------") + # --- 3. 调用 LLM 进行初步规划 --- try: + llm_start_time = time.time() content, _ = await self.llm.generate_response_async(prompt) - logger.debug(f"[私聊][{self.private_name}]LLM (行动规划) 原始返回内容: {content}") + llm_duration = time.time() - llm_start_time + logger.debug(f"[私聊][{self.private_name}] LLM (行动规划) 耗时: {llm_duration:.3f} 秒, 原始返回: {content}") - # --- 初始行动规划解析 --- success, initial_result = get_items_from_json( content, self.private_name, @@ -385,87 +226,190 @@ class ActionPlanner: initial_action = initial_result.get("action", "wait") initial_reason = initial_result.get("reason", "LLM未提供原因,默认等待") + logger.info(f"[私聊][{self.private_name}] LLM 初步规划行动: {initial_action}, 原因: {initial_reason}") + except Exception as llm_err: + logger.error(f"[私聊][{self.private_name}] 调用 LLM 或解析初步规划结果时出错: {llm_err}") + logger.error(traceback.format_exc()) + return "wait", f"行动规划 LLM 调用或解析出错: {llm_err}" - # 检查是否需要进行结束对话决策 --- - if initial_action == "end_conversation": - logger.info(f"[私聊][{self.private_name}]初步规划结束对话,进入告别决策...") + # --- 4. 处理特殊动作 (end_conversation) --- + final_action = initial_action + final_reason = initial_reason - # 使用新的 PROMPT_END_DECISION - end_decision_prompt = PROMPT_END_DECISION.format( - persona_text=persona_text, # 复用之前的 persona_text - chat_history_text=chat_history_text, # 复用之前的 chat_history_text + if initial_action == "end_conversation": + try: + final_action, final_reason = await self._handle_end_conversation_decision( + persona_text, chat_history_text, initial_reason ) + except Exception as end_dec_err: + logger.error(f"[私聊][{self.private_name}] 处理结束对话决策时出错: {end_dec_err}") + logger.warning(f"[私聊][{self.private_name}] 结束决策出错,将按原计划执行 end_conversation") + final_action = "end_conversation" # 保持原计划 + final_reason = initial_reason - logger.debug( - f"[私聊][{self.private_name}]发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------" - ) - try: - end_content, _ = await self.llm.generate_response_async(end_decision_prompt) # 再次调用LLM - logger.debug(f"[私聊][{self.private_name}]LLM (结束决策) 原始返回内容: {end_content}") + # --- [移除] 不再需要在这里检查 wait 动作的约束 --- + # elif initial_action == "wait": + # # ... (移除之前的检查逻辑) ... + # final_action = "wait" + # final_reason = initial_reason - # 解析结束决策的JSON - end_success, end_result = get_items_from_json( - end_content, - self.private_name, - "say_bye", - "reason", - default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误,默认不告别"}, - required_types={"say_bye": str, "reason": str}, # 明确类型 - ) + # --- 5. 验证最终行动类型 --- + valid_actions = [ + "direct_reply", "send_new_message", "wait", "listening", + "rethink_goal", "end_conversation", "block_and_ignore", "say_goodbye" + ] + if final_action not in valid_actions: + logger.warning(f"[私聊][{self.private_name}] LLM 返回了未知的行动类型: '{final_action}',强制改为 wait") + final_reason = f"(原始行动'{final_action}'无效,已强制改为wait) {final_reason}" + final_action = "wait" # 遇到无效动作,默认等待 - say_bye_decision = end_result.get("say_bye", "no").lower() # 转小写方便比较 - end_decision_reason = end_result.get("reason", "未提供原因") + plan_duration = time.time() - plan_start_time + logger.success(f"[私聊][{self.private_name}] 最终规划行动: {final_action} (总耗时: {plan_duration:.3f} 秒)") + logger.info(f"[私聊][{self.private_name}] 行动原因: {final_reason}") + return final_action, final_reason - if end_success and say_bye_decision == "yes": - # 决定要告别,返回新的 'say_goodbye' 动作 - logger.info( - f"[私聊][{self.private_name}]结束决策: yes, 准备生成告别语. 原因: {end_decision_reason}" - ) - # 注意:这里的 reason 可以考虑拼接初始原因和结束决策原因,或者只用结束决策原因 - final_action = "say_goodbye" - final_reason = f"决定发送告别语。决策原因: {end_decision_reason} (原结束理由: {initial_reason})" - return final_action, final_reason - else: - # 决定不告别 (包括解析失败或明确说no) - logger.info( - f"[私聊][{self.private_name}]结束决策: no, 直接结束对话. 原因: {end_decision_reason}" - ) - # 返回原始的 'end_conversation' 动作 - final_action = "end_conversation" - final_reason = initial_reason # 保持原始的结束理由 - return final_action, final_reason - except Exception as end_e: - logger.error(f"[私聊][{self.private_name}]调用结束决策LLM或处理结果时出错: {str(end_e)}") - # 出错时,默认执行原始的结束对话 - logger.warning(f"[私聊][{self.private_name}]结束决策出错,将按原计划执行 end_conversation") - return "end_conversation", initial_reason # 返回原始动作和原因 + # --- Helper methods for preparing prompt inputs --- - else: - action = initial_action - reason = initial_reason + def _get_bot_last_speak_time_info(self, observation_info: ObservationInfo) -> str: + """获取机器人上次发言时间提示""" + + time_info = "" + try: + if not observation_info or not observation_info.bot_id: return "" + bot_id_str = str(observation_info.bot_id) + if hasattr(observation_info, "chat_history") and observation_info.chat_history: + for msg in reversed(observation_info.chat_history): + if not isinstance(msg, dict): continue + sender_info = msg.get("user_info", {}) + sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None + msg_time = msg.get("time") + if sender_id == bot_id_str and msg_time: + time_diff = time.time() - msg_time + if time_diff < 60.0: + time_info = f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n" + break + except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}") + except Exception as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}") + return time_info - # 验证action类型 (保持不变) - valid_actions = [ - "direct_reply", - "send_new_message", - "wait", - "listening", - "rethink_goal", - "end_conversation", # 仍然需要验证,因为可能从上面决策后返回 - "block_and_ignore", - "say_goodbye", # 也要验证这个新动作 - ] - if action not in valid_actions: - logger.warning(f"[私聊][{self.private_name}]LLM返回了未知的行动类型: '{action}',强制改为 wait") - reason = f"(原始行动'{action}'无效,已强制改为wait) {reason}" - action = "wait" + def _get_timeout_context(self, conversation_info: ConversationInfo) -> str: + """获取超时提示信息""" + + timeout_context = "" + try: + if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: + last_goal_item = conversation_info.goal_list[-1] + last_goal_text = "" + if isinstance(last_goal_item, dict): last_goal_text = last_goal_item.get("goal", "") + elif isinstance(last_goal_item, str): last_goal_text = last_goal_item + if isinstance(last_goal_text, str) and "分钟," in last_goal_text and "思考接下来要做什么" in last_goal_text: + wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了","").strip() + timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n" + logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}") + except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}") + except Exception as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}") + return timeout_context - logger.info(f"[私聊][{self.private_name}]规划的行动: {action}") - logger.info(f"[私聊][{self.private_name}]行动原因: {reason}") - return action, reason + def _build_goals_string(self, conversation_info: ConversationInfo) -> str: + """构建对话目标字符串""" + + goals_str = "" + try: + if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: + recent_goals = conversation_info.goal_list[-3:] + for goal_item in recent_goals: + goal = "目标内容缺失"; reasoning = "没有明确原因" + if isinstance(goal_item, dict): + goal = goal_item.get("goal", goal); reasoning = goal_item.get("reasoning", reasoning) + elif isinstance(goal_item, str): goal = goal_item + goal = str(goal) if goal is not None else "目标内容缺失" + reasoning = str(reasoning) if reasoning is not None else "没有明确原因" + goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n" + if not goals_str: goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n" + else: goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n" + except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 构建对话目标字符串时属性错误: {e}"); goals_str = "- 获取对话目标时出错。\n" + except Exception as e: logger.error(f"[私聊][{self.private_name}] 构建对话目标字符串时出错: {e}"); goals_str = "- 构建对话目标时出错。\n" + return goals_str - except Exception as e: - # 外层异常处理保持不变 - logger.error(f"[私聊][{self.private_name}]规划行动时调用 LLM 或处理结果出错: {str(e)}") - return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}" + async def _build_chat_history_text(self, observation_info: ObservationInfo) -> str: + """构建聊天历史记录文本 (包含未处理消息)""" + + chat_history_text = "" + try: + if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: chat_history_text = observation_info.chat_history_str + elif hasattr(observation_info, "chat_history") and observation_info.chat_history: + history_slice = observation_info.chat_history[-20:] + chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) + else: chat_history_text = "还没有聊天记录。\n" + unread_count = getattr(observation_info, 'new_messages_count', 0) + unread_messages = getattr(observation_info, 'unprocessed_messages', []) + if unread_count > 0 and unread_messages: + from ...config.config import global_config + bot_qq_str = str(global_config.BOT_QQ) + other_unread_messages = [msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str] + other_unread_count = len(other_unread_messages) + if other_unread_count > 0: + new_messages_str = await build_readable_messages(other_unread_messages, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) + chat_history_text += f"\n--- 以下是 {other_unread_count} 条你需要处理的新消息 ---\n{new_messages_str}\n------\n" + logger.debug(f"[私聊][{self.private_name}] 向 LLM 追加了 {other_unread_count} 条未读消息。") + except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 构建聊天记录文本时属性错误: {e}"); chat_history_text = "[获取聊天记录时出错]\n" + except Exception as e: logger.error(f"[私聊][{self.private_name}] 处理聊天记录时发生未知错误: {e}"); chat_history_text = "[处理聊天记录时出错]\n" + return chat_history_text + + + def _build_action_history_context(self, conversation_info: ConversationInfo) -> Tuple[str, str]: + """构建行动历史概要和上一次行动详细情况""" + + action_history_summary = "你最近执行的行动历史:\n"; last_action_context = "关于你【上一次尝试】的行动:\n" + action_history_list: List[Dict[str, Any]] = [] + try: + if hasattr(conversation_info, "done_action") and conversation_info.done_action: action_history_list = conversation_info.done_action[-5:] + except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取行动历史时属性错误: {e}") + except Exception as e: logger.error(f"[私聊][{self.private_name}] 访问行动历史时出错: {e}") + if not action_history_list: + action_history_summary += "- 还没有执行过行动。\n"; last_action_context += "- 这是你规划的第一个行动。\n" + else: + for i, action_data in enumerate(action_history_list): + if not isinstance(action_data, dict): logger.warning(f"[私聊][{self.private_name}] 行动历史记录格式错误,跳过: {action_data}"); continue + action_type = action_data.get("action", "未知动作"); plan_reason = action_data.get("plan_reason", "未知规划原因") + status = action_data.get("status", "未知状态"); final_reason = action_data.get("final_reason", "") + action_time = action_data.get("time", "未知时间") + reason_text = f", 最终原因: “{final_reason}”" if final_reason else "" + summary_line = f"- 时间:{action_time}, 尝试:'{action_type}', 状态:{status}{reason_text}" + action_history_summary += summary_line + "\n" + if i == len(action_history_list) - 1: + last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n" + last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n" + if status == "done": last_action_context += "- 该行动已【成功执行】。\n" + elif status == "recall" or status == "error" or status.startswith("cancelled"): + last_action_context += "- 但该行动最终【未能成功执行/被取消/出错】。\n" + if final_reason: last_action_context += f"- 【重要】失败/取消/错误原因是: “{final_reason}”\n" + else: last_action_context += "- 【重要】失败/取消/错误原因未明确记录。\n" + elif status == "start": last_action_context += "- 该行动【正在执行中】或【未完成】。\n" + else: last_action_context += f"- 该行动当前状态未知: {status}\n" + return action_history_summary, last_action_context + + # --- Helper method for handling end_conversation decision --- + + async def _handle_end_conversation_decision( + self, persona_text: str, chat_history_text: str, initial_reason: str + ) -> Tuple[str, str]: + """处理结束对话前的告别决策""" + logger.info(f"[私聊][{self.private_name}] 初步规划结束对话,进入告别决策...") + end_decision_prompt = PROMPT_END_DECISION.format(persona_text=persona_text, chat_history_text=chat_history_text) + logger.debug(f"[私聊][{self.private_name}] 发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------") + llm_start_time = time.time() + end_content, _ = await self.llm.generate_response_async(end_decision_prompt) + llm_duration = time.time() - llm_start_time + logger.debug(f"[私聊][{self.private_name}] LLM (结束决策) 耗时: {llm_duration:.3f} 秒, 原始返回: {end_content}") + end_success, end_result = get_items_from_json(end_content, self.private_name, "say_bye", "reason", default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误,默认不告别"}, required_types={"say_bye": str, "reason": str}) + say_bye_decision = end_result.get("say_bye", "no").lower() + end_decision_reason = end_result.get("reason", "未提供原因") + if end_success and say_bye_decision == "yes": + logger.info(f"[私聊][{self.private_name}] 结束决策: yes, 准备生成告别语. 原因: {end_decision_reason}") + final_action = "say_goodbye"; final_reason = f"决定发送告别语 (原因: {end_decision_reason})。原结束理由: {initial_reason}" + return final_action, final_reason + else: + logger.info(f"[私聊][{self.private_name}] 结束决策: no, 直接结束对话. 原因: {end_decision_reason}") + return "end_conversation", initial_reason diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index e643c0f1..8296c716 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,34 +1,32 @@ -# -*- coding: utf-8 -*- -# File: conversation.py import time import asyncio import datetime +import traceback +from typing import Dict, Any, Optional, Set, List -from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat -from typing import Dict, Any, Optional, Set # <-- 添加 Set 类型提示 -from ..chat.message import Message -from .pfc_types import ConversationState -from .pfc import ChatObserver, GoalAnalyzer -from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger +from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat +from maim_message import UserInfo +from src.plugins.chat.chat_stream import chat_manager, ChatStream +from ..chat.message import Message # 假设 Message 类在这里 +from ...config.config import global_config # 导入全局配置 + +from .pfc_types import ConversationState +from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py +from .chat_observer import ChatObserver +from .message_sender import DirectMessageSender from .action_planner import ActionPlanner from .observation_info import ObservationInfo -from .conversation_info import ConversationInfo +from .conversation_info import ConversationInfo # 导入修改后的 ConversationInfo from .reply_generator import ReplyGenerator -from ..chat.chat_stream import ChatStream from .idle_conversation_starter import IdleConversationStarter -from maim_message import UserInfo -from src.plugins.chat.chat_stream import chat_manager -from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 from .waiter import Waiter -import traceback from rich.traceback import install - install(extra_lines=3) -logger = get_logger("pfc") - +logger = get_logger("pfc_conversation") class Conversation: """对话类,负责管理单个对话的状态和行为""" @@ -41,9 +39,36 @@ class Conversation: self.should_continue = False self.ignore_until_timestamp: Optional[float] = None self.generated_reply = "" + self.chat_stream: Optional[ChatStream] = None + + # 初始化组件为 None + self.action_planner: Optional[ActionPlanner] = None + self.goal_analyzer: Optional[GoalAnalyzer] = None + self.reply_generator: Optional[ReplyGenerator] = None + self.knowledge_fetcher: Optional[KnowledgeFetcher] = None + self.waiter: Optional[Waiter] = None + self.direct_sender: Optional[DirectMessageSender] = None + self.idle_conversation_starter: Optional[IdleConversationStarter] = None + self.chat_observer: Optional[ChatObserver] = None + self.observation_info: Optional[ObservationInfo] = None + self.conversation_info: Optional[ConversationInfo] = None # 使用 ConversationInfo + + self._initializing = False + self._initialized = False + # 在初始化时获取机器人QQ号字符串,避免重复转换 + self.bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None + if not self.bot_qq_str: + logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ ID!PFC 可能无法正常工作。") + async def _initialize(self): - """初始化实例,注册所有组件 (保持不变)""" + """异步初始化对话实例及其所有组件""" + + if self._initialized or self._initializing: + logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。") + return + self._initializing = True + logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}") try: self.action_planner = ActionPlanner(self.stream_id, self.private_name) self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name) @@ -52,201 +77,199 @@ class Conversation: self.waiter = Waiter(self.stream_id, self.private_name) self.direct_sender = DirectMessageSender(self.private_name) self.chat_stream = chat_manager.get_stream(self.stream_id) + if not self.chat_stream: + raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream") self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) - self.stop_action_planner = False - except Exception as e: - logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册运行组件失败: {e}") - logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - raise - - try: self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) - self.chat_observer.start() self.observation_info = ObservationInfo(self.private_name) - self.observation_info.bind_to_chat_observer(self.chat_observer) + if not self.observation_info.bot_id: + logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id,尝试手动设置。") + self.observation_info.bot_id = self.bot_qq_str self.conversation_info = ConversationInfo() + self.observation_info.bind_to_chat_observer(self.chat_observer) + await self._load_initial_history() + self.chat_observer.start() + if self.idle_conversation_starter: + self.idle_conversation_starter.start() + logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动") + self._initialized = True + self.should_continue = True + self.state = ConversationState.ANALYZING + logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。") except Exception as e: - logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}") - logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") + logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + self.should_continue = False + await self.stop() raise + finally: + self._initializing = False + + + async def _load_initial_history(self): + """加载初始聊天记录""" + + if not self.observation_info: return try: - logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...") + logger.info(f"[私聊][{self.private_name}] 为 {self.stream_id} 加载初始聊天记录...") initial_messages = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=30, - ) - chat_talking_prompt = await build_readable_messages( - initial_messages, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="relative", - read_mark=0.0, + chat_id=self.stream_id, timestamp=time.time(), limit=30, ) if initial_messages: self.observation_info.chat_history = initial_messages - self.observation_info.chat_history_str = chat_talking_prompt + "\n" self.observation_info.chat_history_count = len(initial_messages) last_msg = initial_messages[-1] self.observation_info.last_message_time = last_msg.get("time") - last_user_info = UserInfo.from_dict(last_msg.get("user_info", {})) - self.observation_info.last_message_sender = last_user_info.user_id + self.observation_info.last_message_id = last_msg.get("message_id") + last_user_info_dict = last_msg.get("user_info", {}) + if isinstance(last_user_info_dict, dict): + try: + last_user_info = UserInfo.from_dict(last_user_info_dict) + self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None + except Exception as e: + logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}") + self.observation_info.last_message_sender = None + else: self.observation_info.last_message_sender = None self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") - logger.info( - f"[私聊][{self.private_name}]成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}" + history_slice_for_str = initial_messages[-20:] + self.observation_info.chat_history_str = await build_readable_messages( + history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0 ) - # --- 注意: 下面两行保持不变,但其健壮性依赖于 ChatObserver 的实现 --- - self.chat_observer.last_message_time = self.observation_info.last_message_time - self.chat_observer.last_message_read = last_msg - await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) + if self.chat_observer: self.chat_observer.last_message_time = self.observation_info.last_message_time + if self.idle_conversation_starter and self.observation_info.last_message_time: + await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) + logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}") else: - logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。") - + logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。") + self.observation_info.chat_history_str = "还没有聊天记录。" except Exception as load_err: - logger.error(f"[私聊][{self.private_name}]加载初始聊天记录时出错: {load_err}") + logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}") + if self.observation_info: self.observation_info.chat_history_str = "[加载聊天记录出错]" + - self.should_continue = True - asyncio.create_task(self.start()) - # 启动空闲对话检测器 - self.idle_conversation_starter.start() - logger.info(f"[私聊][{self.private_name}]空闲对话检测器已启动") - asyncio.create_task(self.start()) async def start(self): - """开始对话流程 (保持不变)""" - try: - logger.info(f"[私聊][{self.private_name}]对话系统启动中...") - asyncio.create_task(self._plan_and_action_loop()) - except Exception as e: - logger.error(f"[私聊][{self.private_name}]启动对话系统失败: {e}") - raise + """开始对话流程""" + + if not self._initialized: + logger.error(f"[私聊][{self.private_name}] 对话实例未初始化,无法启动。") + try: + await self._initialize() + if not self._initialized: return + except Exception: return + if not self.should_continue: + logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续,无法启动。") + return + logger.info(f"[私聊][{self.private_name}] 对话系统启动,开始规划循环...") + asyncio.create_task(self._plan_and_action_loop()) + + + async def stop(self): + """停止对话实例并清理资源""" + + logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}") + self.should_continue = False + if self.idle_conversation_starter: self.idle_conversation_starter.stop() + if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer() + self._initialized = False + logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。") + async def _plan_and_action_loop(self): - """思考步,PFC核心循环模块""" + """思考步,PFC核心循环模块 - 实现精细化中断逻辑""" + + if not self._initialized: + logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环。") + return while self.should_continue: - # 忽略逻辑 (保持不变) - if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: - # 暂停空闲对话检测器,避免在忽略期间触发 - if hasattr(self, 'idle_conversation_starter') and self.idle_conversation_starter._running: - self.idle_conversation_starter.stop() - logger.debug(f"[私聊][{self.private_name}]对话被暂时忽略,暂停空闲对话检测") - await asyncio.sleep(30) - continue - elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp: - logger.info(f"[私聊][{self.private_name}]忽略时间已到 {self.stream_id},准备结束对话。") - self.ignore_until_timestamp = None - self.should_continue = False + current_loop_start_time = time.time() + # --- 忽略逻辑 --- + if self.ignore_until_timestamp and current_loop_start_time < self.ignore_until_timestamp: + if self.idle_conversation_starter and self.idle_conversation_starter._running: + self.idle_conversation_starter.stop(); logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测") + sleep_duration = min(30, self.ignore_until_timestamp - current_loop_start_time) + await asyncio.sleep(sleep_duration) continue + elif self.ignore_until_timestamp and current_loop_start_time >= self.ignore_until_timestamp: + logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。") + self.ignore_until_timestamp = None; await self.stop(); continue else: - # 确保空闲对话检测器在正常对话时是启动的 - if hasattr(self, 'idle_conversation_starter') and not self.idle_conversation_starter._running: - self.idle_conversation_starter.start() - logger.debug(f"[私聊][{self.private_name}]恢复空闲对话检测") - + if self.idle_conversation_starter and not self.idle_conversation_starter._running: + self.idle_conversation_starter.start(); logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测") + # --- 核心规划与行动逻辑 --- try: - # --- [修改点 1] 在规划前记录未处理消息的 ID 集合 --- - message_ids_before_planning = set() - initial_unprocessed_message_count = 0 - if hasattr(self.observation_info, "unprocessed_messages"): - message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")} - initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) - logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}") - else: - logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning.") - - # --- 调用 Action Planner (保持不变) --- - action, reason = await self.action_planner.plan( - self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action - ) - - # --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 --- - current_unprocessed_messages = [] - current_unprocessed_message_count = 0 - if hasattr(self.observation_info, "unprocessed_messages"): - current_unprocessed_messages = self.observation_info.unprocessed_messages - current_unprocessed_message_count = len(current_unprocessed_messages) - else: - logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning.") - - # 计算规划期间实际新增的消息数量 - new_messages_during_planning_count = 0 + if not all([self.action_planner, self.observation_info, self.conversation_info]): + logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。"); await asyncio.sleep(5); continue + # --- 1. 记录规划开始时间 --- + planning_start_time = time.time() + logger.debug(f"[私聊][{self.private_name}] --- 开始新一轮规划 ({planning_start_time:.2f}) ---") + self.conversation_info.other_new_messages_during_planning_count = 0 + # --- 2. 调用 Action Planner --- + action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action) + planning_duration = time.time() - planning_start_time + logger.debug(f"[私聊][{self.private_name}] 规划耗时: {planning_duration:.3f} 秒,初步规划动作: {action}") + # --- 3. 检查规划期间的新消息 --- + current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []) + new_messages_during_planning: List[Dict[str, Any]] = [] + other_new_messages_during_planning: List[Dict[str, Any]] = [] for msg in current_unprocessed_messages: - msg_id = msg.get("message_id") - if msg_id and msg_id not in message_ids_before_planning: - new_messages_during_planning_count += 1 - - logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}") - - # **核心逻辑:判断是否中断** (保持不变) - if new_messages_during_planning_count > 2: - logger.info( - f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划" - ) - self.conversation_info.last_successful_reply_action = None - await asyncio.sleep(0.1) - continue # 跳过本轮后续处理,直接进入下一轮循环重新规划 - - # --- 执行动作 (移除 message_ids_before_planning 参数传递) --- + msg_time = msg.get('time') + sender_id = msg.get("user_info", {}).get("user_id") + if msg_time and msg_time >= planning_start_time: + new_messages_during_planning.append(msg) + if sender_id != self.bot_qq_str: other_new_messages_during_planning.append(msg) + new_msg_count = len(new_messages_during_planning); other_new_msg_count = len(other_new_messages_during_planning) + logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") + # --- 4. 执行中断检查 --- + should_interrupt = False; interrupt_reason = "" + if action in ["wait", "listening"]: + if new_msg_count > 0: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") + else: + interrupt_threshold = 2 + if other_new_msg_count > interrupt_threshold: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {other_new_msg_count} 条来自他人的新消息 (阈值 >{interrupt_threshold})"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}。") + if should_interrupt: + logger.info(f"[私聊][{self.private_name}] 执行中断,重新规划...") + cancel_record = {"action": action, "plan_reason": reason, "status": "cancelled_due_to_new_messages", "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": interrupt_reason} + if not hasattr(self.conversation_info, "done_action"): self.conversation_info.done_action = [] + self.conversation_info.done_action.append(cancel_record) + self.conversation_info.last_successful_reply_action = None; self.state = ConversationState.ANALYZING; await asyncio.sleep(0.1); continue + # --- 5. 如果未中断,存储状态并执行动作 --- + logger.debug(f"[私聊][{self.private_name}] 未中断,继续执行动作 '{action}'") + self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count await self._handle_action(action, reason, self.observation_info, self.conversation_info) - - # --- 检查是否需要结束对话 (逻辑保持不变) --- + # --- 6. 检查是否需要结束对话 --- goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: - for goal_item in self.conversation_info.goal_list: - current_goal = None - if isinstance(goal_item, dict): - current_goal = goal_item.get("goal") - elif isinstance(goal_item, str): - current_goal = goal_item - if isinstance(current_goal, str) and current_goal == "结束对话": - goal_ended = True - break - if goal_ended: - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]检测到'结束对话'目标,停止循环。") - - except Exception as loop_err: - logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}") - logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - await asyncio.sleep(1) - - if self.should_continue: - await asyncio.sleep(0.1) - - logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") + last_goal_item = self.conversation_info.goal_list[-1]; current_goal = None + if isinstance(last_goal_item, dict): current_goal = last_goal_item.get("goal") + elif isinstance(last_goal_item, str): current_goal = last_goal_item + if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True + last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} + action_ended = last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("status") == "done" + if goal_ended or action_ended: + logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"); await self.stop(); continue + except asyncio.CancelledError: logger.info(f"[私聊][{self.private_name}] PFC 主循环被取消。"); await self.stop(); break + except Exception as loop_err: logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; await asyncio.sleep(5) + # 控制循环频率 + loop_duration = time.time() - current_loop_start_time; min_loop_interval = 0.1 + if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration) + logger.info(f"[私聊][{self.private_name}] PFC 循环结束 for stream_id: {self.stream_id}") - # --- 移除 _check_interrupt_before_sending 方法 --- - # def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool: - # ... (旧代码移除) + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]: + """将消息字典转换为Message对象""" - def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象 (保持不变)""" try: - chat_info = msg_dict.get("chat_info") - if chat_info and isinstance(chat_info, dict): - chat_stream = ChatStream.from_dict(chat_info) - elif self.chat_stream: - chat_stream = self.chat_stream - else: - chat_stream = chat_manager.get_stream(self.stream_id) - if not chat_stream: - raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}") + chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id) + if not chat_stream_to_use: logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"); return None + user_info_dict = msg_dict.get("user_info", {}); user_info: Optional[UserInfo] = None + if isinstance(user_info_dict, dict): + try: user_info = UserInfo.from_dict(user_info_dict) + except Exception as e: logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") + if not user_info: logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo,无法转换。 msg_id: {msg_dict.get('message_id')}"); return None + return Message(message_id=msg_dict.get("message_id", f"gen_{time.time()}"), chat_stream=chat_stream_to_use, time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", "")) + except Exception as e: logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}\n{traceback.format_exc()}"); return None - user_info = UserInfo.from_dict(msg_dict.get("user_info", {})) - return Message( - message_id=msg_dict.get("message_id", f"gen_{time.time()}"), - chat_stream=chat_stream, - time=msg_dict.get("time", time.time()), - user_info=user_info, - processed_plain_text=msg_dict.get("processed_plain_text", ""), - detailed_plain_text=msg_dict.get("detailed_plain_text", ""), - ) - except Exception as e: - logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}") - raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e - - # --- [修改点 3] 修改 _handle_action 签名并调整内部逻辑 (移除 message_ids_before_planning 参数) --- async def _handle_action( self, action: str, @@ -254,373 +277,196 @@ class Conversation: observation_info: ObservationInfo, conversation_info: ConversationInfo ): - """处理规划的行动""" - logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}") + """处理规划的行动 - 实现精细化后续状态设置""" + if not self._initialized: + logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'。") + return - # 记录action历史 (逻辑不变) + logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") + action_start_time = time.time() + + # 记录action历史 current_action_record = { - "action": action, - "plan_reason": reason, - "status": "start", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, + "action": action, "plan_reason": reason, "status": "start", + "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": None, } - if not hasattr(conversation_info, "done_action"): - conversation_info.done_action = [] + if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) action_index = len(conversation_info.done_action) - 1 - action_successful = False - reply_sent = False + action_successful = False # 初始化动作成功状态 + final_status = "recall" # 默认失败状态 + final_reason = "动作未成功执行" # 默认失败原因 - # --- 根据不同的 action 执行 --- - if action == "direct_reply" or action == "send_new_message": - # 合并 reply 和 follow-up 的生成/检查逻辑 (保持不变) - max_reply_attempts = 3 - reply_attempt_count = 0 - is_suitable = False - need_replan = False - check_reason = "未进行尝试" - final_reply_to_send = "" - - while reply_attempt_count < max_reply_attempts and not is_suitable: - reply_attempt_count += 1 - log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - logger.info(log_prefix) + try: + # --- 根据不同的 action 执行 --- + if action in ["direct_reply", "send_new_message", "say_goodbye"]: + # --- 生成回复逻辑 --- self.state = ConversationState.GENERATING + if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化") + generated_content = await self.reply_generator.generate(observation_info, conversation_info, action_type=action) + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type=action - ) - logger.info(f"{log_prefix} 生成内容: {self.generated_reply}") - - self.state = ConversationState.CHECKING - try: - current_goal_str = "" - if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list: - goal_item = conversation_info.goal_list[0] - if isinstance(goal_item, dict): - current_goal_str = goal_item.get('goal', '') - elif isinstance(goal_item, str): - current_goal_str = goal_item - - chat_history_for_check = getattr(observation_info, 'chat_history', []) - chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') - - is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( - reply=self.generated_reply, - goal=current_goal_str, - chat_history=chat_history_for_check, - chat_history_str=chat_history_str_for_check, - retry_count=reply_attempt_count - 1, - ) - logger.info( - f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" - ) - - if not is_suitable or need_replan: - conversation_info.last_reply_rejection_reason = check_reason - conversation_info.last_rejected_reply_content = self.generated_reply - else: - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - - if is_suitable: - final_reply_to_send = self.generated_reply - break - elif need_replan: - logger.warning( - f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}" - ) - break - except Exception as check_err: - logger.error( - f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}" - ) - check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" - conversation_info.last_rejected_reply_content = self.generated_reply - break - - # --- 处理生成和检查的结果 --- - if is_suitable: - # --- [修改点 4] 记录发送前时间戳 --- - timestamp_before_sending = time.time() - logger.debug(f"[私聊][{self.private_name}]准备发送回复,记录发送前时间戳: {timestamp_before_sending}") - - # 确认发送 - self.generated_reply = final_reply_to_send - send_success = await self._send_reply() # 调用发送函数 - await self.idle_conversation_starter.update_last_message_time() - - if send_success: - action_successful = True - reply_sent = True - logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.") - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - - # --- [修改点 5] 基于时间戳处理消息和决定下一轮 prompt 类型 --- - current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) - message_ids_to_clear: Set[str] = set() # 使用 Set 类型 - new_messages_during_sending_count = 0 - - for msg in current_unprocessed_messages: - msg_time = msg.get('time') - msg_id = msg.get('message_id') - if msg_id and msg_time: # 确保时间和 ID 存在 - if msg_time < timestamp_before_sending: - message_ids_to_clear.add(msg_id) - else: - # 时间戳大于等于发送前时间戳,视为新消息 - new_messages_during_sending_count += 1 - - logger.debug(f"[私聊][{self.private_name}]回复发送后,检测到 {len(message_ids_to_clear)} 条发送前消息待清理,{new_messages_during_sending_count} 条发送期间/之后的新消息。") - - # 清理发送前到达的消息 - if message_ids_to_clear: - await observation_info.clear_processed_messages(message_ids_to_clear) - else: - logger.debug(f"[私聊][{self.private_name}]没有需要清理的发送前消息。") - - # 根据发送期间是否有新消息,决定下次规划用哪个 prompt - if new_messages_during_sending_count > 0: - logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_sending_count} 条在发送期间/之后到达的新消息,下一轮将使用首次回复逻辑处理。") - self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY - else: - logger.info(f"[私聊][{self.private_name}]发送期间/之后无新消息,下一轮将根据 '{action}' 使用追问逻辑。") - self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP - - else: # 发送失败 - logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。") - action_successful = False - self.conversation_info.last_successful_reply_action = None - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": "发送回复时失败"} - ) - - elif need_replan: - # 检查后打回动作决策 (保持不变) - logger.warning( - f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None - - else: # 多次尝试后仍然不合适 (保持不变) - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - self.conversation_info.last_successful_reply_action = None - - if action == "send_new_message": - logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - action_successful = True - self.conversation_info.last_successful_reply_action = None - - # --- 处理其他动作 (保持不变,确保状态重置) --- - elif action == "rethink_goal": - self.state = ConversationState.RETHINKING - try: - if not hasattr(self, "goal_analyzer"): - raise AttributeError("GoalAnalyzer not initialized") - await self.goal_analyzer.analyze_goal(conversation_info, observation_info) - action_successful = True - except Exception as rethink_err: - logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} - ) - self.conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - - - elif action == "listening": - self.state = ConversationState.LISTENING - logger.info(f"[私聊][{self.private_name}]倾听对方发言...") - try: - if not hasattr(self, "waiter"): - raise AttributeError("Waiter not initialized") - await self.waiter.wait_listening(conversation_info) - action_successful = True - except Exception as listen_err: - logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} - ) - self.conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - - elif action == "say_goodbye": - self.state = ConversationState.GENERATING - logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") - try: - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="say_goodbye" - ) - logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") - - if self.generated_reply: - # --- [修改点 6] 告别语发送前记录时间戳 --- - timestamp_before_sending_goodbye = time.time() - send_success = await self._send_reply() - if send_success: - action_successful = True - reply_sent = True - logger.info(f"[私聊][{self.private_name}]告别语已发送。") - - # --- [修改点 7] 告别语发送后也处理未读消息 --- - # (虽然通常之后就结束了,但以防万一) - current_unprocessed_messages_goodbye = getattr(observation_info, 'unprocessed_messages', []) - message_ids_to_clear_goodbye: Set[str] = set() - for msg in current_unprocessed_messages_goodbye: - msg_time = msg.get('time') - msg_id = msg.get('message_id') - if msg_id and msg_time and msg_time < timestamp_before_sending_goodbye: - message_ids_to_clear_goodbye.add(msg_id) - if message_ids_to_clear_goodbye: - await observation_info.clear_processed_messages(message_ids_to_clear_goodbye) - - self.should_continue = False # 正常结束 - logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") - else: - logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") - action_successful = False - self.should_continue = True # 发送失败不能结束 - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": "发送告别语失败"} - ) - self.conversation_info.last_successful_reply_action = None - + if not generated_content or generated_content.startswith("抱歉"): + logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。") + final_reason = "生成内容无效" + if action == "say_goodbye": final_status = "done"; self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") + else: final_status = "recall"; conversation_info.last_successful_reply_action = None else: - logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") - action_successful = False - self.should_continue = True - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": "未能生成告别语内容"} - ) - self.conversation_info.last_successful_reply_action = None + # --- 发送回复逻辑 --- + self.generated_reply = generated_content + timestamp_before_sending = time.time() + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") + self.state = ConversationState.SENDING + send_success = await self._send_reply() + send_end_time = time.time() - except Exception as goodbye_err: - logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}") - logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - action_successful = False - self.should_continue = True - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} - ) - self.conversation_info.last_successful_reply_action = None + if send_success: + action_successful = True # <--- 标记动作成功 + # final_status 和 final_reason 在 finally 块中根据 action_successful 设置 + logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") + if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time) - elif action == "end_conversation": - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - action_successful = True - self.conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None + # --- 清理已处理消息 --- + current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear: Set[str] = set() + for msg in current_unprocessed_messages: + msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id") + if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id) + if message_ids_to_clear: logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"); await observation_info.clear_processed_messages(message_ids_to_clear) + else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") + # --- 决定下一轮规划类型 --- + other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) + if other_new_msg_count_during_planning > 0: + logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") + conversation_info.last_successful_reply_action = None + else: + logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") + conversation_info.last_successful_reply_action = action - elif action == "block_and_ignore": - logger.info(f"[私聊][{self.private_name}]不想再理你了...") - ignore_duration_seconds = 10 * 60 - self.ignore_until_timestamp = time.time() + ignore_duration_seconds - logger.info( - f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" - ) - self.state = ConversationState.IGNORED - action_successful = True - self.conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None + # 清除上次拒绝信息 + conversation_info.last_reply_rejection_reason = None; conversation_info.last_rejected_reply_content = None + if action == "say_goodbye": self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") + else: + # 发送失败 + logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") + final_status = "recall"; final_reason = "发送回复时失败" # 发送失败直接设置状态 + conversation_info.last_successful_reply_action = None + if action == "say_goodbye": self.should_continue = True + # --- 其他动作处理 --- + elif action == "rethink_goal": + self.state = ConversationState.RETHINKING + if not self.goal_analyzer: raise RuntimeError("GoalAnalyzer 未初始化") + await self.goal_analyzer.analyze_goal(conversation_info, observation_info) + action_successful = True # <--- 标记动作成功 + elif action == "listening": + self.state = ConversationState.LISTENING + if not self.waiter: raise RuntimeError("Waiter 未初始化") + logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...") + await self.waiter.wait_listening(conversation_info) + action_successful = True # <--- 标记动作成功 + elif action == "end_conversation": + logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...") + action_successful = True # <--- 标记动作成功 + self.should_continue = False + elif action == "block_and_ignore": + logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...") + ignore_duration_seconds = 10 * 60 + self.ignore_until_timestamp = time.time() + ignore_duration_seconds + logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}") + self.state = ConversationState.IGNORED + action_successful = True # <--- 标记动作成功 + elif action == "wait": + self.state = ConversationState.WAITING + if not self.waiter: raise RuntimeError("Waiter 未初始化") + logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...") + timeout_occurred = await self.waiter.wait(self.conversation_info) + action_successful = True # <--- 标记动作成功 + # wait 的 reason 在 finally 中设置 + logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。") + else: + logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}") + final_status = "recall"; final_reason = f"未知的动作类型: {action}" # 未知动作直接失败 - else: # 对应 'wait' 动作 - self.state = ConversationState.WAITING - logger.info(f"[私聊][{self.private_name}]等待更多信息...") - try: - if not hasattr(self, "waiter"): - raise AttributeError("Waiter not initialized") - _timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True - except Exception as wait_err: - logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"等待失败: {wait_err}"} - ) - self.conversation_info.last_successful_reply_action = None - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None + # --- 重置非回复动作的追问状态 --- + if action not in ["direct_reply", "send_new_message", "say_goodbye"]: + conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + except asyncio.CancelledError: + logger.warning(f"[私聊][{self.private_name}] 处理动作 '{action}' 时被取消。") + final_status = "cancelled"; final_reason = "动作处理被取消" + conversation_info.last_successful_reply_action = None + raise + except Exception as handle_err: + logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}") + logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") + final_status = "error"; final_reason = f"处理动作时出错: {handle_err}" + self.state = ConversationState.ERROR + conversation_info.last_successful_reply_action = None + + finally: + # --- 重置临时计数值 --- + conversation_info.other_new_messages_during_planning_count = 0 + + # --- 更新 Action History 状态 (优化) --- + # 如果状态仍然是默认的 recall,但 action_successful 为 True,则更新为 done + if final_status == "recall" and action_successful: + final_status = "done" + # 设置成功的 reason (可以根据动作类型细化) + if action == "wait": + # 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list) + timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False + final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") + elif action == "listening": + final_reason = "进入倾听状态" + elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]: + final_reason = f"成功执行 {action}" # 通用成功原因 + else: # 默认为发送成功 + final_reason = "成功发送" + + # 更新历史记录 + if conversation_info.done_action and action_index < len(conversation_info.done_action): + conversation_info.done_action[action_index].update( + { + "status": final_status, + "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "final_reason": final_reason, + "duration_ms": int((time.time() - action_start_time) * 1000) + } + ) + logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") + else: + logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。") - # --- 更新 Action History 状态 (保持不变) --- - if action_successful: - conversation_info.done_action[action_index].update( - { - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - } - ) - logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'") - else: - logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败") async def _send_reply(self) -> bool: - """发送回复,并返回是否发送成功 (保持不变)""" - if not self.generated_reply: - logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return False + """发送生成的回复""" + if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。"); return False + if not self.direct_sender: logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。"); return False + if not self.chat_stream: logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。"); return False try: reply_content = self.generated_reply - if not hasattr(self, "direct_sender") or not self.direct_sender: - logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return False - if not self.chat_stream: - logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return False - - await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) + await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content, reply_to_message=None) self.state = ConversationState.ANALYZING return True - except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}") - logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - self.state = ConversationState.ANALYZING + logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}\n{traceback.format_exc()}") + self.state = ConversationState.ERROR return False - + # _send_timeout_message 方法可以保持不变 async def _send_timeout_message(self): - """发送超时结束消息 (保持不变)""" - try: - messages = self.chat_observer.get_cached_messages(limit=1) - if not messages: - return - latest_message = self._convert_to_message(messages[0]) - await self.direct_sender.send_message( - chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message - ) - # 停止空闲对话检测器 - if hasattr(self, 'idle_conversation_starter'): - self.idle_conversation_starter.stop() + """发送超时结束消息""" - if hasattr(self, 'chat_observer'): - self.chat_observer.stop() - except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") \ No newline at end of file + if not self.direct_sender or not self.chat_stream: logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。"); return + try: + timeout_content = "我们好像很久没说话了,先这样吧~" + await self.direct_sender.send_message(chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None) + logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。") + await self.stop() + except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index 35c63741..520a5306 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -1,13 +1,14 @@ -# -*- coding: utf-8 -*- -# File: observation_info.py -from typing import List, Optional, Dict, Any, Set -from maim_message import UserInfo import time +import traceback +from typing import List, Optional, Dict, Any, Set + +from maim_message import UserInfo from src.common.logger import get_module_logger +from src.plugins.utils.chat_message_builder import build_readable_messages + +# 确保导入路径正确 from .chat_observer import ChatObserver from .chat_states import NotificationHandler, NotificationType, Notification -from src.plugins.utils.chat_message_builder import build_readable_messages -import traceback # 导入 traceback 用于调试 logger = get_module_logger("observation_info") @@ -16,324 +17,282 @@ class ObservationInfoHandler(NotificationHandler): """ObservationInfo的通知处理器""" def __init__(self, observation_info: "ObservationInfo", private_name: str): - """初始化处理器 - - Args: - observation_info: 要更新的ObservationInfo实例 - private_name: 私聊对象的名称,用于日志记录 - """ + """初始化处理器""" self.observation_info = observation_info - # 将 private_name 存储在 handler 实例中 self.private_name = private_name - async def handle_notification(self, notification: Notification): # 添加类型提示 - # 获取通知类型和数据 + async def handle_notification(self, notification: Notification): + """处理来自 ChatObserver 的通知""" notification_type = notification.type data = notification.data + timestamp = notification.timestamp # 获取通知时间戳 - try: # 添加错误处理块 + try: if notification_type == NotificationType.NEW_MESSAGE: # 处理新消息通知 - # logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 可以在需要时取消注释 - message_id = data.get("message_id") - processed_plain_text = data.get("processed_plain_text") - detailed_plain_text = data.get("detailed_plain_text") - user_info_dict = data.get("user_info") # 先获取字典 - time_value = data.get("time") + message_dict = data # data 本身就是消息字典 + if not isinstance(message_dict, dict): + logger.warning(f"[私聊][{self.private_name}] 收到的 NEW_MESSAGE 数据不是字典: {data}") + return - # 确保 user_info 是字典类型再创建 UserInfo 对象 - user_info = None + # 解析 UserInfo + user_info_dict = message_dict.get("user_info") + user_info: Optional[UserInfo] = None if isinstance(user_info_dict, dict): try: user_info = UserInfo.from_dict(user_info_dict) except Exception as e: - logger.error( - f"[私聊][{self.private_name}]从字典创建 UserInfo 时出错: {e}, 字典内容: {user_info_dict}" - ) - # 可以选择在这里返回或记录错误,避免后续代码出错 - return + logger.error(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") elif user_info_dict is not None: - logger.warning( - f"[私聊][{self.private_name}]收到的 user_info 不是预期的字典类型: {type(user_info_dict)}" - ) - # 根据需要处理非字典情况,这里暂时返回 - return + logger.warning(f"[私聊][{self.private_name}] 收到的 user_info 不是预期的字典类型: {type(user_info_dict)}") - message = { - "message_id": message_id, - "processed_plain_text": processed_plain_text, - "detailed_plain_text": detailed_plain_text, - "user_info": user_info_dict, # 存储原始字典或 UserInfo 对象,取决于你的 update_from_message 如何处理 - "time": time_value, - } - # 传递 UserInfo 对象(如果成功创建)或原始字典 - await self.observation_info.update_from_message(message, user_info) # 修改:传递 user_info 对象 + # 更新 ObservationInfo + await self.observation_info.update_from_message(message_dict, user_info) elif notification_type == NotificationType.COLD_CHAT: # 处理冷场通知 is_cold = data.get("is_cold", False) - await self.observation_info.update_cold_chat_status(is_cold, time.time()) # 修改:改为 await 调用 - - elif notification_type == NotificationType.ACTIVE_CHAT: - # 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理) - is_active = data.get("is_active", False) - self.observation_info.is_cold = not is_active - - elif notification_type == NotificationType.BOT_SPEAKING: - # 处理机器人说话通知 (按需实现) - self.observation_info.is_typing = False - self.observation_info.last_bot_speak_time = time.time() - - elif notification_type == NotificationType.USER_SPEAKING: - # 处理用户说话通知 - self.observation_info.is_typing = False - self.observation_info.last_user_speak_time = time.time() + await self.observation_info.update_cold_chat_status(is_cold, timestamp) # 使用通知时间戳 elif notification_type == NotificationType.MESSAGE_DELETED: # 处理消息删除通知 - message_id = data.get("message_id") - # 从 unprocessed_messages 中移除被删除的消息 - original_count = len(self.observation_info.unprocessed_messages) - self.observation_info.unprocessed_messages = [ - msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id - ] - # --- [修改点 11] 更新 new_messages_count --- - self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) - if self.observation_info.new_messages_count < original_count: - logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}") + message_id_to_delete = data.get("message_id") + if message_id_to_delete: + await self.observation_info.remove_unprocessed_message(message_id_to_delete) + else: + logger.warning(f"[私聊][{self.private_name}] 收到无效的消息删除通知,缺少 message_id: {data}") + # --- 可以根据需要处理其他通知类型 --- + elif notification_type == NotificationType.ACTIVE_CHAT: + is_active = data.get("is_active", False) + # 通常由 COLD_CHAT 的反向状态处理,但也可以在这里显式处理 + await self.observation_info.update_cold_chat_status(not is_active, timestamp) + + elif notification_type == NotificationType.BOT_SPEAKING: + # 机器人开始说话 (例如,如果需要显示"正在输入...") + # self.observation_info.is_typing = True + pass # 暂时不处理 + + elif notification_type == NotificationType.USER_SPEAKING: + # 用户开始说话 + # self.observation_info.is_typing = True + pass # 暂时不处理 elif notification_type == NotificationType.USER_JOINED: - # 处理用户加入通知 (如果适用私聊场景) user_id = data.get("user_id") if user_id: - self.observation_info.active_users.add(str(user_id)) # 确保是字符串 + self.observation_info.active_users.add(str(user_id)) + self.observation_info.update_changed() elif notification_type == NotificationType.USER_LEFT: - # 处理用户离开通知 (如果适用私聊场景) user_id = data.get("user_id") if user_id: - self.observation_info.active_users.discard(str(user_id)) # 确保是字符串 + self.observation_info.active_users.discard(str(user_id)) + self.observation_info.update_changed() elif notification_type == NotificationType.ERROR: - # 处理错误通知 error_msg = data.get("error", "未提供错误信息") - logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}") + logger.error(f"[私聊][{self.private_name}] 收到错误通知: {error_msg}") + # 可以在这里触发一些错误处理逻辑 except Exception as e: - logger.error(f"[私聊][{self.private_name}]处理通知时发生错误: {e}") - logger.error(traceback.format_exc()) # 打印详细堆栈信息 + logger.error(f"[私聊][{self.private_name}] 处理通知时发生错误 (类型: {notification_type.name}): {e}") + logger.error(traceback.format_exc()) -# @dataclass <-- 这个,不需要了(递黄瓜) class ObservationInfo: - """决策信息类,用于收集和管理来自chat_observer的通知信息 (手动实现 __init__)""" - - # 类型提示保留,可用于文档和静态分析 - private_name: str - chat_history: List[Dict[str, Any]] - chat_history_str: str - unprocessed_messages: List[Dict[str, Any]] - active_users: Set[str] - last_bot_speak_time: Optional[float] - last_user_speak_time: Optional[float] - last_message_time: Optional[float] - last_message_id: Optional[str] - last_message_content: str - last_message_sender: Optional[str] - bot_id: Optional[str] - chat_history_count: int - new_messages_count: int - cold_chat_start_time: Optional[float] - cold_chat_duration: float - is_typing: bool - is_cold_chat: bool - changed: bool - chat_observer: Optional[ChatObserver] - handler: Optional[ObservationInfoHandler] + """决策信息类,用于收集和管理来自chat_observer的通知信息""" def __init__(self, private_name: str): - """ - 手动初始化 ObservationInfo 的所有实例变量。 - """ - - # 接收的参数 + """初始化 ObservationInfo""" self.private_name: str = private_name - # data_list - self.chat_history: List[Dict[str, Any]] = [] - self.chat_history_str: str = "" - self.unprocessed_messages: List[Dict[str, Any]] = [] - self.active_users: Set[str] = set() + # 聊天记录相关 + self.chat_history: List[Dict[str, Any]] = [] # 存储已处理的消息历史 + self.chat_history_str: str = "还没有聊天记录。" # 用于生成 Prompt 的历史记录字符串 + self.chat_history_count: int = 0 - # data + # 未处理消息相关 (核心修改点) + self.unprocessed_messages: List[Dict[str, Any]] = [] # 存储尚未被机器人回复的消息 + self.new_messages_count: int = 0 # unprocessed_messages 的数量 + + # 状态信息 + self.active_users: Set[str] = set() # 当前活跃用户 (私聊场景可能只有对方) self.last_bot_speak_time: Optional[float] = None - self.last_user_speak_time: Optional[float] = None - self.last_message_time: Optional[float] = None + self.last_user_speak_time: Optional[float] = None # 指对方用户的发言时间 + self.last_message_time: Optional[float] = None # 指所有消息(包括自己)的最新时间 self.last_message_id: Optional[str] = None self.last_message_content: str = "" - self.last_message_sender: Optional[str] = None - self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id,例如从 global_config 获取 - self.chat_history_count: int = 0 - self.new_messages_count: int = 0 + self.last_message_sender: Optional[str] = None # user_id of the last message sender + self.bot_id: Optional[str] = None # 机器人自己的 ID + + # 冷场状态 self.cold_chat_start_time: Optional[float] = None self.cold_chat_duration: float = 0.0 + self.is_cold_chat: bool = False # 当前是否处于冷场状态 - # state - self.is_typing: bool = False - self.is_cold_chat: bool = False - self.changed: bool = False + # 其他状态 + self.is_typing: bool = False # 是否正在输入 (未来可能用到) + self.changed: bool = False # 状态是否有变化 (用于优化) # 关联对象 self.chat_observer: Optional[ChatObserver] = None + self.handler: Optional[ObservationInfoHandler] = ObservationInfoHandler(self, self.private_name) - self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name) + # 初始化 bot_id + try: + from ...config.config import global_config + self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None + if not self.bot_id: + logger.error(f"[私聊][{self.private_name}] 未能从配置中获取 BOT_QQ ID!") + except ImportError: + logger.error(f"[私聊][{self.private_name}] 无法导入 global_config 获取 BOT_QQ ID!") + except Exception as e: + logger.error(f"[私聊][{self.private_name}] 获取 BOT_QQ ID 时出错: {e}") - # --- 初始化 bot_id --- - from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题 - self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None def bind_to_chat_observer(self, chat_observer: ChatObserver): - """绑定到指定的chat_observer (保持不变)""" + """绑定到指定的 ChatObserver 并注册通知处理器""" if self.chat_observer: - logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver") + logger.warning(f"[私聊][{self.private_name}] 尝试重复绑定 ChatObserver") return + if not self.handler: + logger.error(f"[私聊][{self.private_name}] ObservationInfoHandler 未初始化,无法绑定!") + return self.chat_observer = chat_observer try: - if not self.handler: - logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!") - self.chat_observer = None - return + # 注册需要处理的通知类型 + notification_manager = self.chat_observer.notification_manager + notification_manager.register_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler) + notification_manager.register_handler("observation_info", NotificationType.COLD_CHAT, self.handler) + notification_manager.register_handler("observation_info", NotificationType.MESSAGE_DELETED, self.handler) + # 根据需要注册更多类型... + # notification_manager.register_handler("observation_info", NotificationType.ACTIVE_CHAT, self.handler) + # notification_manager.register_handler("observation_info", NotificationType.USER_JOINED, self.handler) + # notification_manager.register_handler("observation_info", NotificationType.USER_LEFT, self.handler) + # notification_manager.register_handler("observation_info", NotificationType.ERROR, self.handler) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler - ) - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler - ) - # --- [修改点 12] 注册 MESSAGE_DELETED --- - self.chat_observer.notification_manager.register_handler( - target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - ) - logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") + logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功绑定到 ChatObserver") + except AttributeError: + logger.error(f"[私聊][{self.private_name}] 绑定的 ChatObserver 对象缺少 notification_manager 属性!") + self.chat_observer = None # 绑定失败 except Exception as e: - logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") - self.chat_observer = None + logger.error(f"[私聊][{self.private_name}] 绑定到 ChatObserver 时出错: {e}") + self.chat_observer = None # 绑定失败 + def unbind_from_chat_observer(self): - """解除与chat_observer的绑定 (保持不变)""" - if ( - self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler - ): + """解除与 ChatObserver 的绑定""" + if self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler: try: - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler - ) - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler - ) - # --- [修改点 13] 注销 MESSAGE_DELETED --- - self.chat_observer.notification_manager.unregister_handler( - target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - ) - logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") + notification_manager = self.chat_observer.notification_manager + notification_manager.unregister_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler) + notification_manager.unregister_handler("observation_info", NotificationType.COLD_CHAT, self.handler) + notification_manager.unregister_handler("observation_info", NotificationType.MESSAGE_DELETED, self.handler) + # ... 注销其他已注册的类型 ... + + logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功从 ChatObserver 解绑") except Exception as e: - logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") + logger.error(f"[私聊][{self.private_name}] 从 ChatObserver 解绑时出错: {e}") finally: - self.chat_observer = None + self.chat_observer = None # 无论成功与否都清除引用 else: - logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置") + logger.warning(f"[私聊][{self.private_name}] 尝试解绑时 ChatObserver 无效或 handler 未设置") + async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): - """从消息更新信息 (保持不变)""" + """根据收到的新消息更新 ObservationInfo 的状态""" message_time = message.get("time") message_id = message.get("message_id") processed_text = message.get("processed_plain_text", "") + sender_id_str: Optional[str] = str(user_info.user_id) if user_info else None - if message_time and message_time > (self.last_message_time or 0): + if not message_time or not message_id: + logger.warning(f"[私聊][{self.private_name}] 收到的消息缺少 time 或 message_id: {message}") + return + + # 更新最后消息时间(所有消息) + if message_time > (self.last_message_time or 0): self.last_message_time = message_time self.last_message_id = message_id self.last_message_content = processed_text - self.is_cold_chat = False - self.cold_chat_start_time = None - self.cold_chat_duration = 0.0 + self.last_message_sender = sender_id_str - if user_info: - sender_id = str(user_info.user_id) - self.last_message_sender = sender_id - if sender_id == self.bot_id: - self.last_bot_speak_time = message_time - else: - self.last_user_speak_time = message_time - self.active_users.add(sender_id) + # 更新说话者特定时间 + if sender_id_str: + if sender_id_str == self.bot_id: + self.last_bot_speak_time = message_time else: - logger.warning( - f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}" - ) - self.last_message_sender = None - - # --- [修改点 14] 添加到未处理列表,并更新计数 --- - # 检查消息是否已存在于未处理列表中,避免重复添加 - if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): - self.unprocessed_messages.append(message) - self.new_messages_count = len(self.unprocessed_messages) - logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}") - self.update_changed() - else: - logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}") - + self.last_user_speak_time = message_time + self.active_users.add(sender_id_str) # 添加到活跃用户 else: - pass + logger.warning(f"[私聊][{self.private_name}] 处理消息更新时缺少有效的 UserInfo, message_id: {message_id}") + + # 更新冷场状态 + self.is_cold_chat = False + self.cold_chat_start_time = None + self.cold_chat_duration = 0.0 + + # --- [核心修改] 将新消息添加到未处理列表 --- + # 检查消息是否已存在于未处理列表中,避免重复添加 + if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): + # 创建消息的副本以避免修改原始数据(如果需要) + self.unprocessed_messages.append(message.copy()) + self.new_messages_count = len(self.unprocessed_messages) + logger.debug(f"[私聊][{self.private_name}] 添加新未处理消息 ID: {message_id}, 发送者: {sender_id_str}, 当前未处理数: {self.new_messages_count}") + self.update_changed() + else: + logger.warning(f"[私聊][{self.private_name}] 尝试重复添加未处理消息 ID: {message_id}") - def update_changed(self): - """标记状态已改变,并重置标记 (保持不变)""" - self.changed = True + async def remove_unprocessed_message(self, message_id_to_delete: str): + """从 unprocessed_messages 列表中移除指定 ID 的消息""" + original_count = len(self.unprocessed_messages) + self.unprocessed_messages = [ + msg for msg in self.unprocessed_messages if msg.get("message_id") != message_id_to_delete + ] + new_count = len(self.unprocessed_messages) + + if new_count < original_count: + self.new_messages_count = new_count + logger.info(f"[私聊][{self.private_name}] 移除了未处理的消息 (ID: {message_id_to_delete}), 当前未处理数: {self.new_messages_count}") + self.update_changed() + else: + logger.warning(f"[私聊][{self.private_name}] 尝试移除不存在的未处理消息 ID: {message_id_to_delete}") + async def update_cold_chat_status(self, is_cold: bool, current_time: float): - """更新冷场状态 (保持不变)""" + """更新冷场状态""" if is_cold != self.is_cold_chat: self.is_cold_chat = is_cold if is_cold: - self.cold_chat_start_time = ( - self.last_message_time or current_time - ) - logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}") + # 冷场开始时间应基于最后一条消息的时间 + self.cold_chat_start_time = self.last_message_time or current_time + logger.info(f"[私聊][{self.private_name}] 进入冷场状态,开始时间: {self.cold_chat_start_time:.2f}") else: if self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time - logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") - self.cold_chat_start_time = None + logger.info(f"[私聊][{self.private_name}] 结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") + self.cold_chat_start_time = None # 结束冷场,重置开始时间 self.update_changed() + # 持续更新冷场时长 if self.is_cold_chat and self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time - def get_active_duration(self) -> float: - """获取当前活跃时长 (保持不变)""" - if not self.last_message_time: - return 0.0 - return time.time() - self.last_message_time - def get_user_response_time(self) -> Optional[float]: - """获取用户最后响应时间 (保持不变)""" - if not self.last_user_speak_time: - return None - return time.time() - self.last_user_speak_time + def update_changed(self): + """标记状态已改变""" + self.changed = True + # 这个标记通常在处理完改变后由外部逻辑重置为 False - def get_bot_response_time(self) -> Optional[float]: - """获取机器人最后响应时间 (保持不变)""" - if not self.last_bot_speak_time: - return None - return time.time() - self.last_bot_speak_time # --- [修改点 15] 重命名并修改 clear_unprocessed_messages --- - # async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除 async def clear_processed_messages(self, message_ids_to_clear: Set[str]): - """将指定ID的未处理消息移入历史记录,并更新相关状态""" + """将指定 ID 的未处理消息移入历史记录,并更新相关状态""" if not message_ids_to_clear: - logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。") + logger.debug(f"[私聊][{self.private_name}] 没有需要清理的消息 ID。") return messages_to_move = [] @@ -342,36 +301,40 @@ class ObservationInfo: # 分离要清理和要保留的消息 for msg in self.unprocessed_messages: - if msg.get("message_id") in message_ids_to_clear: + msg_id = msg.get("message_id") + if msg_id in message_ids_to_clear: messages_to_move.append(msg) cleared_count += 1 else: remaining_messages.append(msg) if not messages_to_move: - logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。") + logger.debug(f"[私聊][{self.private_name}] 未找到与 ID 列表 {message_ids_to_clear} 匹配的未处理消息进行清理。") return - logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...") + logger.debug(f"[私聊][{self.private_name}] 准备清理 {cleared_count} 条已处理消息...") - # 将要移动的消息添加到历史记录 - max_history_len = 100 + # 将要移动的消息添加到历史记录 (按时间排序) + messages_to_move.sort(key=lambda m: m.get("time", 0)) self.chat_history.extend(messages_to_move) + + # 限制历史记录长度 (可选) + max_history_len = 100 # 例如保留最近 100 条 if len(self.chat_history) > max_history_len: self.chat_history = self.chat_history[-max_history_len:] - # 更新历史记录字符串 (仅使用最近一部分生成) - history_slice_for_str = self.chat_history[-20:] # 例如最近20条 + # 更新历史记录字符串 (仅使用最近一部分生成,提高效率) + history_slice_for_str = self.chat_history[-20:] # 例如最近 20 条 try: self.chat_history_str = await build_readable_messages( history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0, + read_mark=0.0, # read_mark 可能需要调整或移除 ) except Exception as e: - logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") + logger.error(f"[私聊][{self.private_name}] 构建聊天记录字符串时出错: {e}") self.chat_history_str = "[构建聊天记录出错]" # 更新未处理消息列表和计数 @@ -379,6 +342,27 @@ class ObservationInfo: self.new_messages_count = len(self.unprocessed_messages) self.chat_history_count = len(self.chat_history) - logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") + logger.info(f"[私聊][{self.private_name}] 已清理 {cleared_count} 条消息 (IDs: {message_ids_to_clear}),剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") + + self.update_changed() # 状态改变 + + + # --- Helper methods (可以根据需要添加) --- + def get_active_duration(self) -> float: + """获取当前活跃时长(距离最后一条消息的时间)""" + if not self.last_message_time: + return float('inf') # 或返回 0.0,取决于定义 + return time.time() - self.last_message_time + + def get_user_response_time(self) -> Optional[float]: + """获取对方最后响应时间(距离对方最后一条消息的时间)""" + if not self.last_user_speak_time: + return None + return time.time() - self.last_user_speak_time + + def get_bot_response_time(self) -> Optional[float]: + """获取机器人最后响应时间(距离机器人最后一条消息的时间)""" + if not self.last_bot_speak_time: + return None + return time.time() - self.last_bot_speak_time - self.update_changed() # 状态改变 \ No newline at end of file