From 7737a95e405c21c34cfd23cd78689e60a3ce6d22 Mon Sep 17 00:00:00 2001 From: 114514 <2514624910@qq.com> Date: Sun, 4 May 2025 03:43:59 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=86=8D=E6=AC=A1=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E7=9A=84=E5=A4=8D=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 22 +++++++++++-- src/plugins/PFC/conversation_info.py | 2 ++ src/plugins/PFC/reply_generator.py | 49 ++++++++++++++++++++++------ 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 9a0dd36b..5e04e992 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -337,6 +337,14 @@ class Conversation: logger.info( f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + if not is_suitable or need_replan: + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + else: + # 如果检查通过,清空上次的拒绝原因 + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + if is_suitable: final_reply_to_send = self.generated_reply break @@ -350,8 +358,11 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" # 出错也记录原因 + conversation_info.last_rejected_reply_content = self.generated_reply break + # 循环结束,处理最终结果 if is_suitable: # 检查是否有新消息 @@ -366,7 +377,9 @@ class Conversation: self.generated_reply = final_reply_to_send # --- 在这里调用 _send_reply --- await self._send_reply() # <--- 调用恢复后的函数 - + # --- 新增:回复成功,清除拒绝原因 --- + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None # 更新状态: 标记上次成功是 send_new_message self.conversation_info.last_successful_reply_action = "send_new_message" action_successful = True # 标记动作成功 @@ -470,7 +483,9 @@ class Conversation: self.generated_reply = final_reply_to_send # --- 在这里调用 _send_reply --- await self._send_reply() # <--- 调用恢复后的函数 - + # --- 新增:回复成功,清除拒绝原因 --- + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 # 更新状态: 标记上次成功是 direct_reply self.conversation_info.last_successful_reply_action = "direct_reply" action_successful = True # 标记动作成功 @@ -649,6 +664,9 @@ class Conversation: # 重置状态: 对于非回复类动作的成功,清除上次回复状态 if action not in ["direct_reply", "send_new_message"]: self.conversation_info.last_successful_reply_action = None + # --- 新增:非回复动作成功,也清除拒绝原因 --- + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action diff --git a/src/plugins/PFC/conversation_info.py b/src/plugins/PFC/conversation_info.py index 04524b69..be754b3c 100644 --- a/src/plugins/PFC/conversation_info.py +++ b/src/plugins/PFC/conversation_info.py @@ -8,3 +8,5 @@ class ConversationInfo: self.knowledge_list = [] self.memory_list = [] self.last_successful_reply_action: Optional[str] = None + self.last_reply_rejection_reason: Optional[str] = None # 用于存储上次回复被拒原因 + self.last_rejected_reply_content: Optional[str] = None # 用于存储上次被拒的回复内容 \ No newline at end of file diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index b9d2c00e..c98b8d1c 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -38,6 +38,8 @@ PROMPT_DIRECT_REPLY = """{persona_text}。现在你在参与一场QQ私聊,请 {retrieved_memory_str} +{last_rejection_info} + 请根据上述信息,结合聊天记录,回复对方。该回复应该: 1. 符合对话目标,以"你"的角度发言(不要自己与自己对话!) @@ -67,6 +69,8 @@ PROMPT_SEND_NEW_MESSAGE = """{persona_text}。现在你在参与一场QQ私聊 {retrieved_memory_str} +{last_rejection_info} + 请根据上述信息,结合聊天记录,继续发一条新消息(例如对之前消息的补充,深入话题,或追问等等)。该消息应该: 1. 符合对话目标,以"你"的角度发言(不要自己与自己对话!) 2. 符合你的性格特征和身份细节 @@ -242,6 +246,25 @@ class ReplyGenerator: retrieved_memory_str = "无聊天记录,无法自动检索记忆。\n" retrieved_knowledge_str = "无聊天记录,无法自动检索知识。\n" + # --- 修改:构建上次回复失败原因和内容提示 --- + last_rejection_info_str = "" + # 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None + last_reason = getattr(conversation_info, 'last_reply_rejection_reason', None) + last_content = getattr(conversation_info, 'last_rejected_reply_content', None) + + if last_reason and last_content: + last_rejection_info_str = ( + f"\n------\n" + f"【重要提示:你上一次尝试回复时失败了,以下是详细信息】\n" + f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容 + f"失败原因: “{last_reason}”\n" + f"请根据【消息内容】和【失败原因】调整你的新回复,避免重复之前的错误。\n" + f"------\n" + ) + logger.info(f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n" + f" 内容: {last_content}\n" + f" 原因: {last_reason}") + # --- 选择 Prompt --- if action_type == "send_new_message": prompt_template = PROMPT_SEND_NEW_MESSAGE @@ -254,16 +277,22 @@ class ReplyGenerator: logger.info(f"[私聊][{self.private_name}]使用 PROMPT_DIRECT_REPLY (首次/非连续回复生成)") # --- 格式化最终的 Prompt --- - prompt = prompt_template.format( - persona_text=persona_text, - goals_str=goals_str, - chat_history_text=chat_history_text, - # knowledge_info_str=knowledge_info_str, # 移除了这个旧的知识展示方式 - retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", # 如果为空则提示无 - retrieved_knowledge_str=retrieved_knowledge_str - if retrieved_knowledge_str - else "无相关知识。", # 如果为空则提示无 - ) + try: # <--- 增加 try-except 块处理可能的 format 错误 + prompt = prompt_template.format( + persona_text=persona_text, + goals_str=goals_str, + chat_history_text=chat_history_text, + retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", + retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。", + last_rejection_info=last_rejection_info_str # <--- 新增传递上次拒绝原因 + ) + except KeyError as e: + logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。") + # 返回错误信息或默认回复 + return "抱歉,准备回复时出了点问题,请检查一下我的代码..." + except Exception as fmt_err: + logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}") + return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..." # --- 调用 LLM 生成 --- logger.debug(f"[私聊][{self.private_name}]发送到LLM的生成提示词:\n------\n{prompt}\n------") From 741fa815c040206011bdc4381fae1cbe93d920e2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 3 May 2025 19:44:13 +0000 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 9 ++++----- src/plugins/PFC/conversation_info.py | 4 ++-- src/plugins/PFC/reply_generator.py | 30 ++++++++++++++++------------ 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 5e04e992..66dbbf7b 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -341,7 +341,7 @@ class Conversation: conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = self.generated_reply else: - # 如果检查通过,清空上次的拒绝原因 + # 如果检查通过,清空上次的拒绝原因 conversation_info.last_reply_rejection_reason = None conversation_info.last_rejected_reply_content = None @@ -358,11 +358,10 @@ class Conversation: f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" # 出错也记录原因 + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" # 出错也记录原因 conversation_info.last_rejected_reply_content = self.generated_reply break - # 循环结束,处理最终结果 if is_suitable: # 检查是否有新消息 @@ -485,7 +484,7 @@ class Conversation: await self._send_reply() # <--- 调用恢复后的函数 # --- 新增:回复成功,清除拒绝原因 --- conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 + conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 # 更新状态: 标记上次成功是 direct_reply self.conversation_info.last_successful_reply_action = "direct_reply" action_successful = True # 标记动作成功 @@ -666,7 +665,7 @@ class Conversation: self.conversation_info.last_successful_reply_action = None # --- 新增:非回复动作成功,也清除拒绝原因 --- conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 + conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action diff --git a/src/plugins/PFC/conversation_info.py b/src/plugins/PFC/conversation_info.py index be754b3c..062a4641 100644 --- a/src/plugins/PFC/conversation_info.py +++ b/src/plugins/PFC/conversation_info.py @@ -8,5 +8,5 @@ class ConversationInfo: self.knowledge_list = [] self.memory_list = [] self.last_successful_reply_action: Optional[str] = None - self.last_reply_rejection_reason: Optional[str] = None # 用于存储上次回复被拒原因 - self.last_rejected_reply_content: Optional[str] = None # 用于存储上次被拒的回复内容 \ No newline at end of file + self.last_reply_rejection_reason: Optional[str] = None # 用于存储上次回复被拒原因 + self.last_rejected_reply_content: Optional[str] = None # 用于存储上次被拒的回复内容 diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index c98b8d1c..5c4eb46c 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -249,21 +249,23 @@ class ReplyGenerator: # --- 修改:构建上次回复失败原因和内容提示 --- last_rejection_info_str = "" # 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None - last_reason = getattr(conversation_info, 'last_reply_rejection_reason', None) - last_content = getattr(conversation_info, 'last_rejected_reply_content', None) + last_reason = getattr(conversation_info, "last_reply_rejection_reason", None) + last_content = getattr(conversation_info, "last_rejected_reply_content", None) if last_reason and last_content: last_rejection_info_str = ( f"\n------\n" f"【重要提示:你上一次尝试回复时失败了,以下是详细信息】\n" - f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容 + f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容 f"失败原因: “{last_reason}”\n" f"请根据【消息内容】和【失败原因】调整你的新回复,避免重复之前的错误。\n" f"------\n" ) - logger.info(f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n" - f" 内容: {last_content}\n" - f" 原因: {last_reason}") + logger.info( + f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n" + f" 内容: {last_content}\n" + f" 原因: {last_reason}" + ) # --- 选择 Prompt --- if action_type == "send_new_message": @@ -277,22 +279,24 @@ class ReplyGenerator: logger.info(f"[私聊][{self.private_name}]使用 PROMPT_DIRECT_REPLY (首次/非连续回复生成)") # --- 格式化最终的 Prompt --- - try: # <--- 增加 try-except 块处理可能的 format 错误 + try: # <--- 增加 try-except 块处理可能的 format 错误 prompt = prompt_template.format( persona_text=persona_text, goals_str=goals_str, chat_history_text=chat_history_text, retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。", - last_rejection_info=last_rejection_info_str # <--- 新增传递上次拒绝原因 + last_rejection_info=last_rejection_info_str, # <--- 新增传递上次拒绝原因 ) except KeyError as e: - logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。") - # 返回错误信息或默认回复 - return "抱歉,准备回复时出了点问题,请检查一下我的代码..." + logger.error( + f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。" + ) + # 返回错误信息或默认回复 + return "抱歉,准备回复时出了点问题,请检查一下我的代码..." except Exception as fmt_err: - logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}") - return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..." + logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}") + return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..." # --- 调用 LLM 生成 --- logger.debug(f"[私聊][{self.private_name}]发送到LLM的生成提示词:\n------\n{prompt}\n------") From acbabdc24b937b920faadc1ce614048350f3c792 Mon Sep 17 00:00:00 2001 From: 114514 <2514624910@qq.com> Date: Sun, 4 May 2025 17:45:13 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E7=A9=B6=E6=9E=81=E7=9A=84=E5=8F=AF?= =?UTF-8?q?=E7=BB=B4=E6=8A=A4=E6=80=A7=E5=92=8C=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/action_planner.py | 79 ++------------------------ src/plugins/PFC/pfc_utils.py | 72 +++++++++++++++++++++++- src/plugins/PFC/reply_generator.py | 90 ++++-------------------------- 3 files changed, 85 insertions(+), 156 deletions(-) diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index 51d9bff2..5cba1bb1 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -1,12 +1,6 @@ import time from typing import Tuple, Optional -from src.plugins.memory_system.Hippocampus import HippocampusManager - -# --- NEW IMPORT --- -# 从 heartflow 导入知识检索和数据库查询函数/实例 -from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder - -# --- END NEW IMPORT --- +from .pfc_utils import retrieve_contextual_info # import jieba # 如果需要旧版知识库的回退,可能需要 # import re # 如果需要旧版知识库的回退,可能需要 from src.common.logger_manager import get_logger @@ -128,41 +122,6 @@ class ActionPlanner: self.private_name = private_name self.chat_observer = ChatObserver.get_instance(stream_id, private_name) - # _get_memory_info 保持不变 - async def _get_memory_info(self, text: str) -> str: - """根据文本自动检索相关记忆""" - memory_prompt = "" - related_memory_info = "" - try: - related_memory = await HippocampusManager.get_instance().get_memory_from_text( - text=text, - max_memory_num=2, # 最多获取 2 条记忆 - max_memory_length=2, # 每条记忆长度限制(这个参数含义可能需确认) - max_depth=3, # 搜索深度 - fast_retrieval=False, # 是否快速检索 - ) - if related_memory: - for memory in related_memory: - # memory[0] 是记忆ID, memory[1] 是记忆内容 - related_memory_info += memory[1] + "\n" # 将记忆内容拼接起来 - if related_memory_info: - memory_prompt = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n" - logger.debug( - f"[私聊]决策层[{self.private_name}]自动检索到记忆: {related_memory_info.strip()[:100]}..." - ) - else: - logger.debug(f"[私聊]决策层[{self.private_name}]自动检索记忆返回为空。") - else: - logger.debug(f"[私聊]决策层[{self.private_name}]未自动检索到相关记忆。") - except Exception as e: - logger.error(f"[私聊]决策层[{self.private_name}]自动检索记忆时出错: {e}") - # memory_prompt = "检索记忆时出错。\n" # 可以选择是否提示错误 - return memory_prompt - - # --- REMOVED _get_prompt_info_old --- - - # --- REMOVED _get_prompt_info --- - # 修改 plan 方法签名,增加 last_successful_reply_action 参数 async def plan( self, @@ -377,38 +336,10 @@ class ActionPlanner: last_action_context += f"- 该行动当前状态: {status}\n" # self.last_successful_action_type = None # 非完成状态,清除记录 - retrieved_memory_str_planner = "" - retrieved_knowledge_str_planner = "" - retrieval_context = chat_history_text # 使用聊天记录作为检索上下文 - if retrieval_context and retrieval_context != "还没有聊天记录。" and retrieval_context != "[构建聊天记录出错]": - try: - # 调用本地的 _get_memory_info - logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 开始自动检索记忆...") - retrieved_memory_str_planner = await self._get_memory_info(text=retrieval_context) - logger.info( - f"[私聊][{self.private_name}] (ActionPlanner) 自动检索记忆 {'完成' if retrieved_memory_str_planner else '无结果'}。" - ) - - # --- MODIFIED KNOWLEDGE RETRIEVAL --- - # 调用导入的 prompt_builder.get_prompt_info - logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 开始自动检索知识 (使用导入函数)...") - # 使用导入的 prompt_builder 实例及其方法 - retrieved_knowledge_str_planner = await prompt_builder.get_prompt_info( - message=retrieval_context, threshold=0.38 - ) - # --- END MODIFIED KNOWLEDGE RETRIEVAL --- - logger.info( - f"[私聊][{self.private_name}] (ActionPlanner) 自动检索知识 {'完成' if retrieved_knowledge_str_planner else '无结果'}。" - ) - - except Exception as retrieval_err: - logger.error(f"[私聊][{self.private_name}] (ActionPlanner) 自动检索时出错: {retrieval_err}") - retrieved_memory_str_planner = "检索记忆时出错。\n" - retrieved_knowledge_str_planner = "检索知识时出错。\n" - else: - logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 无有效聊天记录,跳过自动检索。") - retrieved_memory_str_planner = "无聊天记录无法检索记忆。\n" - retrieved_knowledge_str_planner = "无聊天记录无法检索知识。\n" + retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(chat_history_text, self.private_name) + # Optional: 可以加一行日志确认结果,方便调试 + logger.info(f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}") + # --- 选择 Prompt --- if last_successful_reply_action in ["direct_reply", "send_new_message"]: diff --git a/src/plugins/PFC/pfc_utils.py b/src/plugins/PFC/pfc_utils.py index 2f7bd5e0..a7a412c1 100644 --- a/src/plugins/PFC/pfc_utils.py +++ b/src/plugins/PFC/pfc_utils.py @@ -1,9 +1,77 @@ +import traceback import json import re from typing import Dict, Any, Optional, Tuple, List, Union -from src.common.logger import get_module_logger +from src.common.logger_manager import get_logger # 确认 logger 的导入路径 +from src.plugins.memory_system.Hippocampus import HippocampusManager +from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 + +logger = get_logger("pfc_utils") + + +async def retrieve_contextual_info(text: str, private_name: str) -> Tuple[str, str]: + """ + 根据输入文本检索相关的记忆和知识。 + + Args: + text: 用于检索的上下文文本 (例如聊天记录)。 + private_name: 私聊对象的名称,用于日志记录。 + + Returns: + Tuple[str, str]: (检索到的记忆字符串, 检索到的知识字符串) + """ + retrieved_memory_str = "无相关记忆。" + retrieved_knowledge_str = "无相关知识。" + memory_log_msg = "未自动检索到相关记忆。" + knowledge_log_msg = "未自动检索到相关知识。" + + if not text or text == "还没有聊天记录。" or text == "[构建聊天记录出错]": + logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 无有效上下文,跳过检索。") + return retrieved_memory_str, retrieved_knowledge_str + + # 1. 检索记忆 (逻辑来自原 _get_memory_info) + try: + related_memory = await HippocampusManager.get_instance().get_memory_from_text( + text=text, + max_memory_num=2, + max_memory_length=2, + max_depth=3, + fast_retrieval=False, + ) + if related_memory: + related_memory_info = "" + for memory in related_memory: + related_memory_info += memory[1] + "\n" + if related_memory_info: + # 注意:原版提示信息可以根据需要调整 + retrieved_memory_str = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n" + memory_log_msg = f"自动检索到记忆: {related_memory_info.strip()[:100]}..." + else: + memory_log_msg = "自动检索记忆返回为空。" + logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 记忆检索: {memory_log_msg}") + + except Exception as e: + logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}") + retrieved_memory_str = "检索记忆时出错。\n" + + # 2. 检索知识 (逻辑来自原 action_planner 和 reply_generator) + try: + # 使用导入的 prompt_builder 实例及其方法 + knowledge_result = await prompt_builder.get_prompt_info( + message=text, threshold=0.38 # threshold 可以根据需要调整 + ) + if knowledge_result: + retrieved_knowledge_str = knowledge_result # 直接使用返回结果 + knowledge_log_msg = "自动检索到相关知识。" + logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}") + + except Exception as e: + logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}") + retrieved_knowledge_str = "检索知识时出错。\n" + + return retrieved_memory_str, retrieved_knowledge_str + -logger = get_module_logger("pfc_utils") def get_items_from_json( diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index c98b8d1c..33a4bb41 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -1,16 +1,10 @@ -# 用于访问记忆系统 -from src.plugins.memory_system.Hippocampus import HippocampusManager - -# --- NEW IMPORT --- -# 从 heartflow 导入知识检索和数据库查询函数/实例 -from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder - -# --- END NEW IMPORT --- +from .pfc_utils import retrieve_contextual_info # 可能用于旧知识库提取主题 (如果需要回退到旧方法) # import jieba # 如果报错说找不到 jieba,可能需要安装: pip install jieba # import re # 正则表达式库,通常 Python 自带 from typing import Tuple, List, Dict, Any -from src.common.logger import get_module_logger +# from src.common.logger import get_module_logger +from src.common.logger_manager import get_logger from ..models.utils_model import LLMRequest from ...config.config import global_config from .chat_observer import ChatObserver @@ -20,7 +14,7 @@ from .observation_info import ObservationInfo from .conversation_info import ConversationInfo from src.plugins.utils.chat_message_builder import build_readable_messages -logger = get_module_logger("reply_generator") +logger = get_logger("reply_generator") # --- 定义 Prompt 模板 --- @@ -120,39 +114,7 @@ class ReplyGenerator: self.chat_observer = ChatObserver.get_instance(stream_id, private_name) self.reply_checker = ReplyChecker(stream_id, private_name) - # _get_memory_info 保持不变,因为它不是与 heartflow 重复的部分 - async def _get_memory_info(self, text: str) -> str: - """根据文本自动检索相关记忆""" - memory_prompt = "" - related_memory_info = "" - try: - related_memory = await HippocampusManager.get_instance().get_memory_from_text( - text=text, - max_memory_num=2, # 最多获取 2 条记忆 - max_memory_length=2, # 每条记忆长度限制(这个参数含义可能需确认) - max_depth=3, # 搜索深度 - fast_retrieval=False, # 是否快速检索 - ) - if related_memory: - for memory in related_memory: - # memory[0] 是记忆ID, memory[1] 是记忆内容 - related_memory_info += memory[1] + "\n" # 将记忆内容拼接起来 - if related_memory_info: - memory_prompt = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,不一定是目前聊天里的人说的,回忆中别人说的事情也不一定是准确的,请记住)\n" - logger.debug(f"[私聊][{self.private_name}]自动检索到记忆: {related_memory_info.strip()[:100]}...") - else: - logger.debug(f"[私聊][{self.private_name}]自动检索记忆返回为空。") - else: - logger.debug(f"[私聊][{self.private_name}]未自动检索到相关记忆。") - except Exception as e: - logger.error(f"[私聊][{self.private_name}]自动检索记忆时出错: {e}") - # memory_prompt = "检索记忆时出错。\n" # 可以选择是否提示错误 - return memory_prompt - - # --- REMOVED _get_prompt_info_old --- - - # --- REMOVED _get_prompt_info --- - + # 修改 generate 方法签名,增加 action_type 参数 async def generate( self, observation_info: ObservationInfo, conversation_info: ConversationInfo, action_type: str @@ -209,43 +171,11 @@ class ReplyGenerator: # 构建 Persona 文本 (persona_text) persona_text = f"你的名字是{self.name},{self.personality_info}。" - retrieved_memory_str = "" - retrieved_knowledge_str = "" - # 使用 chat_history_text 作为检索的上下文,因为它包含了最近的对话和新消息 - retrieval_context = chat_history_text - if retrieval_context and retrieval_context != "还没有聊天记录。" and retrieval_context != "[构建聊天记录出错]": - try: - # 提取记忆 (调用本地的 _get_memory_info) - logger.debug(f"[私聊][{self.private_name}]开始自动检索记忆...") - retrieved_memory_str = await self._get_memory_info(text=retrieval_context) - if retrieved_memory_str: - logger.info(f"[私聊][{self.private_name}]自动检索到记忆片段。") - else: - logger.info(f"[私聊][{self.private_name}]未自动检索到相关记忆。") - - # --- MODIFIED KNOWLEDGE RETRIEVAL --- - # 提取知识 (调用导入的 prompt_builder.get_prompt_info) - logger.debug(f"[私聊][{self.private_name}]开始自动检索知识 (使用导入函数)...") - # 使用导入的 prompt_builder 实例及其方法 - retrieved_knowledge_str = await prompt_builder.get_prompt_info( - message=retrieval_context, threshold=0.38 - ) - # --- END MODIFIED KNOWLEDGE RETRIEVAL --- - - if retrieved_knowledge_str: - logger.info(f"[私聊][{self.private_name}]自动检索到相关知识。") - else: - logger.info(f"[私聊][{self.private_name}]未自动检索到相关知识。") - - except Exception as retrieval_err: - logger.error(f"[私聊][{self.private_name}]在自动检索记忆/知识时发生错误: {retrieval_err}") - retrieved_memory_str = "检索记忆时出错。\n" - retrieved_knowledge_str = "检索知识时出错。\n" - else: - logger.debug(f"[私聊][{self.private_name}]聊天记录为空或无效,跳过自动记忆/知识检索。") - retrieved_memory_str = "无聊天记录,无法自动检索记忆。\n" - retrieved_knowledge_str = "无聊天记录,无法自动检索知识。\n" - + retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text + # 调用共享函数进行检索 + retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(retrieval_context, self.private_name) + logger.info(f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}") + # --- 修改:构建上次回复失败原因和内容提示 --- last_rejection_info_str = "" # 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None From bf939cdd39b8be2d7efd37d301f8925f897b48f5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 4 May 2025 09:45:46 +0000 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/action_planner.py | 10 +++++++--- src/plugins/PFC/pfc_utils.py | 23 +++++++++++++---------- src/plugins/PFC/reply_generator.py | 15 ++++++++++----- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index 5cba1bb1..faf13bd0 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -1,6 +1,7 @@ import time from typing import Tuple, Optional from .pfc_utils import retrieve_contextual_info + # import jieba # 如果需要旧版知识库的回退,可能需要 # import re # 如果需要旧版知识库的回退,可能需要 from src.common.logger_manager import get_logger @@ -336,10 +337,13 @@ class ActionPlanner: last_action_context += f"- 该行动当前状态: {status}\n" # self.last_successful_action_type = None # 非完成状态,清除记录 - retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(chat_history_text, self.private_name) + retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info( + chat_history_text, self.private_name + ) # Optional: 可以加一行日志确认结果,方便调试 - logger.info(f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}") - + logger.info( + f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}" + ) # --- 选择 Prompt --- if last_successful_reply_action in ["direct_reply", "send_new_message"]: diff --git a/src/plugins/PFC/pfc_utils.py b/src/plugins/PFC/pfc_utils.py index a7a412c1..b0f3f841 100644 --- a/src/plugins/PFC/pfc_utils.py +++ b/src/plugins/PFC/pfc_utils.py @@ -2,9 +2,9 @@ import traceback import json import re from typing import Dict, Any, Optional, Tuple, List, Union -from src.common.logger_manager import get_logger # 确认 logger 的导入路径 +from src.common.logger_manager import get_logger # 确认 logger 的导入路径 from src.plugins.memory_system.Hippocampus import HippocampusManager -from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 +from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 logger = get_logger("pfc_utils") @@ -47,33 +47,36 @@ async def retrieve_contextual_info(text: str, private_name: str) -> Tuple[str, s retrieved_memory_str = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n" memory_log_msg = f"自动检索到记忆: {related_memory_info.strip()[:100]}..." else: - memory_log_msg = "自动检索记忆返回为空。" + memory_log_msg = "自动检索记忆返回为空。" logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 记忆检索: {memory_log_msg}") except Exception as e: - logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}") + logger.error( + f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}" + ) retrieved_memory_str = "检索记忆时出错。\n" # 2. 检索知识 (逻辑来自原 action_planner 和 reply_generator) try: # 使用导入的 prompt_builder 实例及其方法 knowledge_result = await prompt_builder.get_prompt_info( - message=text, threshold=0.38 # threshold 可以根据需要调整 + message=text, + threshold=0.38, # threshold 可以根据需要调整 ) if knowledge_result: - retrieved_knowledge_str = knowledge_result # 直接使用返回结果 - knowledge_log_msg = "自动检索到相关知识。" + retrieved_knowledge_str = knowledge_result # 直接使用返回结果 + knowledge_log_msg = "自动检索到相关知识。" logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}") except Exception as e: - logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}") + logger.error( + f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}" + ) retrieved_knowledge_str = "检索知识时出错。\n" return retrieved_memory_str, retrieved_knowledge_str - - def get_items_from_json( content: str, private_name: str, diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index 1dc564de..892e881b 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -1,8 +1,10 @@ from .pfc_utils import retrieve_contextual_info + # 可能用于旧知识库提取主题 (如果需要回退到旧方法) # import jieba # 如果报错说找不到 jieba,可能需要安装: pip install jieba # import re # 正则表达式库,通常 Python 自带 from typing import Tuple, List, Dict, Any + # from src.common.logger import get_module_logger from src.common.logger_manager import get_logger from ..models.utils_model import LLMRequest @@ -114,7 +116,6 @@ class ReplyGenerator: self.chat_observer = ChatObserver.get_instance(stream_id, private_name) self.reply_checker = ReplyChecker(stream_id, private_name) - # 修改 generate 方法签名,增加 action_type 参数 async def generate( self, observation_info: ObservationInfo, conversation_info: ConversationInfo, action_type: str @@ -171,11 +172,15 @@ class ReplyGenerator: # 构建 Persona 文本 (persona_text) persona_text = f"你的名字是{self.name},{self.personality_info}。" - retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text + retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text # 调用共享函数进行检索 - retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(retrieval_context, self.private_name) - logger.info(f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}") - + retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info( + retrieval_context, self.private_name + ) + logger.info( + f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}" + ) + # --- 修改:构建上次回复失败原因和内容提示 --- last_rejection_info_str = "" # 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None From 62b7cd6e8f8f134ef223135afd30f6d430a8f7a3 Mon Sep 17 00:00:00 2001 From: Bakadax Date: Mon, 5 May 2025 21:36:56 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 628 ++++++++++++++-------------- src/plugins/PFC/observation_info.py | 181 ++++---- 2 files changed, 407 insertions(+), 402 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 66dbbf7b..9f4bcfa0 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: conversation.py import time import asyncio import datetime @@ -9,7 +11,7 @@ from src.plugins.utils.chat_message_builder import build_readable_messages, get_ from typing import Dict, Any, Optional from ..chat.message import Message from .pfc_types import ConversationState -from .pfc import ChatObserver, GoalAnalyzer +from .pfc import ChatObserver, GoalAnalyzer # pfc.py 包含了 GoalAnalyzer,无需重复导入 from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner @@ -19,7 +21,7 @@ from .reply_generator import ReplyGenerator from ..chat.chat_stream import ChatStream from maim_message import UserInfo from src.plugins.chat.chat_stream import chat_manager -from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .pfc_KnowledgeFetcher import KnowledgeFetcher # 注意:这里是 PFC_KnowledgeFetcher.py from .waiter import Waiter import traceback @@ -114,6 +116,10 @@ class Conversation: ) # 让 ChatObserver 从加载的最后一条消息之后开始同步 + # **** 注意:这里的 last_message_time 设置可能需要 review **** + # 如果数据库消息时间戳可能不完全连续,直接设置 last_message_time 可能导致 observer 错过消息 + # 更稳妥的方式是让 observer 自己管理其内部的 last_message_time 或 last_message_id + # 暂时保留,但标记为潜在问题点。如果 observer 逻辑是可靠的,则此行 OK。 self.chat_observer.last_message_time = self.observation_info.last_message_time self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录 else: @@ -138,7 +144,7 @@ class Conversation: async def _plan_and_action_loop(self): """思考步,PFC核心循环模块""" while self.should_continue: - # 忽略逻辑 + # 忽略逻辑 (保持不变) if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: await asyncio.sleep(30) continue @@ -147,64 +153,92 @@ class Conversation: self.ignore_until_timestamp = None self.should_continue = False continue + try: - # --- 在规划前记录当前新消息数量 --- - initial_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - initial_new_message_count = self.observation_info.new_messages_count + 1 # 算上麦麦自己发的那一条 + # --- [修改点 1] 在规划前记录新消息状态 --- + # 记录规划开始时未处理消息的 ID 集合,用于后续判断哪些是新来的 + message_ids_before_planning = set() + initial_unprocessed_message_count = 0 # <-- 新增:记录规划前的未处理消息数 + if hasattr(self.observation_info, "unprocessed_messages"): + message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")} + initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) # <-- 获取初始数量 + logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}") else: logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' before planning." + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning." ) - # --- 调用 Action Planner --- - # 传递 self.conversation_info.last_successful_reply_action + + # --- 调用 Action Planner (保持不变) --- action, reason = await self.action_planner.plan( self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) - # --- 规划后检查是否有 *更多* 新消息到达 --- - current_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - current_new_message_count = self.observation_info.new_messages_count + # --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 --- + current_unprocessed_messages = [] + current_unprocessed_message_count = 0 + if hasattr(self.observation_info, "unprocessed_messages"): + current_unprocessed_messages = self.observation_info.unprocessed_messages + current_unprocessed_message_count = len(current_unprocessed_messages) # <-- 获取当前数量 else: logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' after planning." + f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning." ) - if current_new_message_count > initial_new_message_count + 2: + # 计算规划期间实际新增的消息数量 + new_messages_during_planning_count = 0 + new_message_ids_during_planning = set() + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_planning_count += 1 + new_message_ids_during_planning.add(msg_id) + + logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}") + + # **核心逻辑:判断是否中断** + # 这里的 +2 是根据你的需求来的,代表允许的缓冲 + # 我们比较的是 *规划期间新增的消息数* 是否超过阈值 + if new_messages_during_planning_count > 2: logger.info( - f"[私聊][{self.private_name}]规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划" + f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划" ) - # 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了 + # 中断时,重置上次回复状态,因为需要基于最新消息重新决策 self.conversation_info.last_successful_reply_action = None - await asyncio.sleep(0.1) - continue + # **重要**: 中断时不清空未处理消息,留给下一轮规划处理 + await asyncio.sleep(0.1) # 短暂暂停避免CPU空转 + continue # 跳过本轮后续处理,直接进入下一轮循环重新规划 - # 包含 send_new_message - if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]: - if hasattr(self.observation_info, "clear_unprocessed_messages"): - logger.debug( - f"[私聊][{self.private_name}]准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。" - ) - await self.observation_info.clear_unprocessed_messages() - if hasattr(self.observation_info, "new_messages_count"): - self.observation_info.new_messages_count = 0 - else: - logger.error( - f"[私聊][{self.private_name}]无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!" - ) + # --- [修改点 3] 准备执行动作,处理规划时已知的消息 --- + # 如果决定要回复 (direct_reply 或 send_new_message),并且规划开始时就有未处理消息 + # 这表示 LLM 规划时已经看到了这些消息 + # 我们需要在发送回复 *后* 清理掉这些规划时已知的消息 + # 注意:这里不再立即清理,清理逻辑移到 _handle_action 成功发送后 + messages_known_during_planning = [] + if action in ["direct_reply", "send_new_message"] and initial_unprocessed_message_count > 0: + messages_known_during_planning = [ + msg for msg_id in message_ids_before_planning + if (msg := next((m for m in self.observation_info.unprocessed_messages if m.get("message_id") == msg_id), None)) is not None + ] + logger.debug(f"[私聊][{self.private_name}]规划时已知 {len(messages_known_during_planning)} 条消息,将在回复成功后清理。") - await self._handle_action(action, reason, self.observation_info, self.conversation_info) - # 检查是否需要结束对话 (逻辑不变) + # --- 执行动作 --- + # 将规划时已知需要清理的消息ID集合传递给 _handle_action + await self._handle_action(action, reason, self.observation_info, self.conversation_info, message_ids_before_planning) + + # --- 检查是否需要结束对话 (逻辑保持不变) --- goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: for goal_item in self.conversation_info.goal_list: + current_goal = None # 初始化 current_goal if isinstance(goal_item, dict): current_goal = goal_item.get("goal") + elif isinstance(goal_item, str): # 处理直接是字符串的情况 + current_goal = goal_item - if current_goal == "结束对话": + # 确保 current_goal 是字符串再比较 + if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True break @@ -215,38 +249,49 @@ class Conversation: except Exception as loop_err: logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - await asyncio.sleep(1) + await asyncio.sleep(1) # 出错时暂停一下 + # 循环间隔 if self.should_continue: - await asyncio.sleep(0.1) + await asyncio.sleep(0.1) # 非阻塞的短暂停 logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") - def _check_new_messages_after_planning(self): - """检查在规划后是否有新消息""" - # 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性 - if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "new_messages_count"): - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。" - ) - return False # 或者根据需要抛出错误 - if self.observation_info.new_messages_count > 2: - logger.info( - f"[私聊][{self.private_name}]生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划" + # --- [修改点 4] 修改 _check_new_messages_after_planning --- + # 重命名并修改逻辑,用于在 *发送前* 检查是否有过多新消息(兜底检查) + def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool: + """在发送回复前,最后检查一次是否有过多新消息导致需要中断""" + if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "unprocessed_messages"): + logger.warning( + f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'unprocessed_messages' 属性,无法检查新消息。" ) - # 如果有新消息,也应该重置上次回复状态 - if hasattr(self, "conversation_info"): # 确保 conversation_info 已初始化 + return False + + current_unprocessed_messages = self.observation_info.unprocessed_messages + new_messages_count = 0 + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_count += 1 + + # 使用与规划后检查相同的阈值 + if new_messages_count > 2: + logger.info( + f"[私聊][{self.private_name}]准备发送时发现新增消息数 ({new_messages_count}) 超过阈值(2),取消发送并重新规划" + ) + if hasattr(self, "conversation_info"): self.conversation_info.last_successful_reply_action = None else: logger.warning( f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" ) - return True - return False + return True # 需要中断 + return False # 不需要中断 + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象""" + """将消息字典转换为Message对象 (保持不变)""" try: # 尝试从 msg_dict 直接获取 chat_stream,如果失败则从全局 chat_manager 获取 chat_info = msg_dict.get("chat_info") @@ -274,8 +319,14 @@ class Conversation: # 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题 raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e + # --- [修改点 5] 修改 _handle_action 签名并调整内部逻辑 --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo + self, + action: str, + reason: str, + observation_info: ObservationInfo, + conversation_info: ConversationInfo, + message_ids_before_planning: set # <-- 接收规划前的消息ID集合 ): """处理规划的行动""" @@ -289,18 +340,17 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), "final_reason": None, } - # 确保 done_action 列表存在 if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) action_index = len(conversation_info.done_action) - 1 - action_successful = False # 用于标记动作是否成功完成 + action_successful = False + reply_sent = False # <-- 新增:标记是否成功发送了回复 # --- 根据不同的 action 执行 --- - - # send_new_message 失败后执行 wait - if action == "send_new_message": + if action == "direct_reply" or action == "send_new_message": + # 合并 direct_reply 和 send_new_message 的大部分逻辑 max_reply_attempts = 3 reply_attempt_count = 0 is_suitable = False @@ -310,248 +360,170 @@ class Conversation: while reply_attempt_count < max_reply_attempts and not is_suitable: reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) + log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." + logger.info(log_prefix) self.state = ConversationState.GENERATING - # 1. 生成回复 (调用 generate 时传入 action_type) + # 1. 生成回复 (传入 action_type) self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="send_new_message" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" + observation_info, conversation_info, action_type=action ) + logger.info(f"{log_prefix} 生成内容: {self.generated_reply}") # 2. 检查回复 (逻辑不变) self.state = ConversationState.CHECKING try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" + current_goal_str = "" # 初始化 + if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list: + goal_item = conversation_info.goal_list[0] # 取第一个目标 + if isinstance(goal_item, dict): + current_goal_str = goal_item.get('goal', '') + elif isinstance(goal_item, str): + current_goal_str = goal_item + + # 确保 chat_history 和 chat_history_str 存在 + chat_history_for_check = getattr(observation_info, 'chat_history', []) + chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') + is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, + chat_history=chat_history_for_check, + chat_history_str=chat_history_str_for_check, retry_count=reply_attempt_count - 1, ) logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" + f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + + # 更新拒绝原因和内容 (仅在不合适或需要重规划时) if not is_suitable or need_replan: conversation_info.last_reply_rejection_reason = check_reason conversation_info.last_rejected_reply_content = self.generated_reply else: - # 如果检查通过,清空上次的拒绝原因 + # 检查通过,清空上次拒绝记录 conversation_info.last_reply_rejection_reason = None conversation_info.last_rejected_reply_content = None if is_suitable: final_reply_to_send = self.generated_reply - break + break # 检查通过,跳出循环 elif need_replan: logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" + f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}" ) - break + break # 需要重新规划,跳出循环 except Exception as check_err: logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" + f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" # 出错也记录原因 - conversation_info.last_rejected_reply_content = self.generated_reply - break + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" + conversation_info.last_rejected_reply_content = self.generated_reply # 记录出错时尝试的内容 + break # 检查出错,跳出循环 - # 循环结束,处理最终结果 + # --- 处理生成和检查的结果 --- if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动") + # --- [修改点 6] 发送前最后检查是否需要中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]生成回复后、发送前发现过多新消息,取消发送,重新规划") conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"} + {"status": "recall", "final_reason": f"发送前发现过多新消息,取消发送: {final_reply_to_send}"} ) - return # 直接返回,重新规划 + self.conversation_info.last_successful_reply_action = None # 重置状态 + return # 直接返回,主循环会重新规划 - # 发送合适的回复 + # 确认发送 self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 - # --- 新增:回复成功,清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None - # 更新状态: 标记上次成功是 send_new_message - self.conversation_info.last_successful_reply_action = "send_new_message" - action_successful = True # 标记动作成功 + send_success = await self._send_reply() # 调用发送函数 + + if send_success: + action_successful = True + reply_sent = True # 标记回复已发送 + logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.") + # 清空上次拒绝记录 (再次确保) + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + + # --- [修改点 7] 发送成功后,处理新消息并决定下一轮 prompt 类型 --- + # 获取发送后的最新未处理消息列表 + final_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + final_unprocessed_count = len(final_unprocessed_messages) + + # 计算在生成和发送期间新增的消息数 + new_messages_during_generation_count = 0 + for msg in final_unprocessed_messages: + msg_id = msg.get("message_id") + # 如果消息 ID 不在规划前的集合中,说明是新来的 + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_generation_count += 1 + + logger.debug(f"[私聊][{self.private_name}]回复发送后,当前未处理消息数: {final_unprocessed_count}, 其中生成/发送期间新增: {new_messages_during_generation_count}") + + # 根据生成期间是否有新消息,决定下次规划用哪个 prompt + if new_messages_during_generation_count > 0: + # 有 1 条或更多新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_generation_count} 条在生成/发送期间到达的新消息,下一轮将使用首次回复逻辑处理。") + self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY + else: + # 没有新消息在生成期间到达 + logger.info(f"[私聊][{self.private_name}]生成/发送期间无新消息,下一轮将根据 '{action}' 使用追问逻辑。") + self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP + + # --- [修改点 8] 清理规划时已知的消息 --- + # 只有在回复成功发送后,才清理掉那些在规划时就已经看到的消息 + if message_ids_before_planning: + await observation_info.clear_processed_messages(message_ids_before_planning) + + + else: # 发送失败 + logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。") + # 发送失败,也认为动作未成功,重置状态 + action_successful = False + self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送回复时失败"} + ) elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} - ) + # 检查后决定打回动作决策 + logger.warning( + f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - else: - # 追问失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 追问失败,下次用初始 prompt - self.conversation_info.last_successful_reply_action = None + else: # 多次尝试后仍然不合适 (is_suitable is False and not need_replan) + logger.warning( + f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 - # 执行 Wait 操作 - logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - - elif action == "direct_reply": - max_reply_attempts = 3 - reply_attempt_count = 0 - is_suitable = False - need_replan = False - check_reason = "未进行尝试" - final_reply_to_send = "" - - while reply_attempt_count < max_reply_attempts and not is_suitable: - reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) - self.state = ConversationState.GENERATING - - # 1. 生成回复 - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="direct_reply" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" - ) - - # 2. 检查回复 - self.state = ConversationState.CHECKING - try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" - is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( - reply=self.generated_reply, - goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, - retry_count=reply_attempt_count - 1, - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" - ) - if is_suitable: - final_reply_to_send = self.generated_reply - break - elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" - ) - break - except Exception as check_err: - logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" - ) - check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - break - - # 循环结束,处理最终结果 - if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成首次回复期间收到新消息,取消发送,重新规划行动") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"} - ) - return # 直接返回,重新规划 - - # 发送合适的回复 - self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 - # --- 新增:回复成功,清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 - # 更新状态: 标记上次成功是 direct_reply - self.conversation_info.last_successful_reply_action = "direct_reply" - action_successful = True # 标记动作成功 - - elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} - ) - - else: - # 首次回复失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 首次回复失败,下次还是用初始 prompt - self.conversation_info.last_successful_reply_action = None - - # 执行 Wait 操作 (保持原有逻辑) - logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - - # elif action == "fetch_knowledge": - # self.state = ConversationState.FETCHING - # knowledge_query = reason - # try: - # 检查 knowledge_fetcher 是否存在 - # if not hasattr(self, "knowledge_fetcher"): - # logger.error(f"[私聊][{self.private_name}]KnowledgeFetcher 未初始化,无法获取知识。") - # raise AttributeError("KnowledgeFetcher not initialized") - - # knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history) - # logger.info(f"[私聊][{self.private_name}]获取到知识: {knowledge[:100]}..., 来源: {source}") - # if knowledge: - # 确保 knowledge_list 存在 - # if not hasattr(conversation_info, "knowledge_list"): - # conversation_info.knowledge_list = [] - # conversation_info.knowledge_list.append( - # {"query": knowledge_query, "knowledge": knowledge, "source": source} - # ) - # action_successful = True - # except Exception as fetch_err: - # logger.error(f"[私聊][{self.private_name}]获取知识时出错: {str(fetch_err)}") - # conversation_info.done_action[action_index].update( - # {"status": "recall", "final_reason": f"获取知识失败: {str(fetch_err)}"} - # ) - # self.conversation_info.last_successful_reply_action = None # 重置状态 + # 如果是 send_new_message 失败,则执行 wait (保持原 fallback 逻辑) + if action == "send_new_message": + logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") + self.state = ConversationState.WAITING + await self.waiter.wait(self.conversation_info) + wait_action_record = { + "action": "wait", + "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", + "status": "done", # wait 本身算完成 + "time": datetime.datetime.now().strftime("%H:%M:%S"), + "final_reason": None, + } + conversation_info.done_action.append(wait_action_record) + action_successful = True # fallback wait 成功 + # 注意: fallback wait 成功后,last_successful_reply_action 仍然是 None + # --- 处理其他动作 (保持大部分不变,主要是确保状态重置) --- elif action == "rethink_goal": self.state = ConversationState.RETHINKING try: - # 检查 goal_analyzer 是否存在 if not hasattr(self, "goal_analyzer"): logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。") raise AttributeError("GoalAnalyzer not initialized") @@ -562,67 +534,99 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 last_successful_reply_action + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None # 清除拒绝原因 + conversation_info.last_rejected_reply_content = None + elif action == "listening": self.state = ConversationState.LISTENING logger.info(f"[私聊][{self.private_name}]倾听对方发言...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。") raise AttributeError("Waiter not initialized") await self.waiter.wait_listening(conversation_info) - action_successful = True # Listening 完成就算成功 + action_successful = True except Exception as listen_err: logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None elif action == "say_goodbye": - self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING + self.state = ConversationState.GENERATING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: - # 1. 生成告别语 (使用 'say_goodbye' action_type) + # 1. 生成告别语 self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" ) logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") - # 2. 直接发送告别语 (不经过检查) - if self.generated_reply: # 确保生成了内容 - await self._send_reply() # 调用发送方法 - # 发送成功后,标记动作成功 - action_successful = True - logger.info(f"[私聊][{self.private_name}]告别语已发送。") + # 2. 发送告别语 + if self.generated_reply: + # --- [修改点 9] 告别前也检查中断 --- + if self._check_interrupt_before_sending(message_ids_before_planning): + logger.info(f"[私聊][{self.private_name}]发送告别语前发现过多新消息,取消发送,重新规划") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语前发现过多新消息"} + ) + self.should_continue = True # 不能结束,需要重规划 + self.conversation_info.last_successful_reply_action = None # 重置状态 + return + + send_success = await self._send_reply() + if send_success: + action_successful = True + reply_sent = True # 标记发送成功 + logger.info(f"[私聊][{self.private_name}]告别语已发送。") + # 发送告别语成功后,通常意味着对话结束 + self.should_continue = False + logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + else: + logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") + action_successful = False + # 发送失败不应结束对话,可能需要重试或做其他事 + self.should_continue = True + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语失败"} + ) + self.conversation_info.last_successful_reply_action = None # 重置状态 + else: logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True # 未能生成也不能结束 conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": "未能生成告别语内容"} ) - - # 3. 无论是否发送成功,都准备结束对话 - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + self.conversation_info.last_successful_reply_action = None except Exception as goodbye_err: logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - # 即使出错,也结束对话 - self.should_continue = False - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True # 出错也不能结束 conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} ) + self.conversation_info.last_successful_reply_action = None elif action == "end_conversation": - # 这个分支现在只会在 action_planner 最终决定不告别时被调用 self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - action_successful = True # 标记这个指令本身是成功的 + action_successful = True + # 结束对话也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}]不想再理你了...") @@ -632,27 +636,34 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED - action_successful = True # 标记动作成功 + action_successful = True + # 忽略也重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + else: # 对应 'wait' 动作 self.state = ConversationState.WAITING logger.info(f"[私聊][{self.private_name}]等待更多信息...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。") raise AttributeError("Waiter not initialized") _timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True # Wait 完成就算成功 + action_successful = True except Exception as wait_err: logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"等待失败: {wait_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + # 无论成功失败,非回复动作都重置 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + # --- 更新 Action History 状态 --- - # 只有当动作本身成功时,才更新状态为 done if action_successful: conversation_info.done_action[action_index].update( { @@ -660,51 +671,50 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), } ) - # 重置状态: 对于非回复类动作的成功,清除上次回复状态 - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None - # --- 新增:非回复动作成功,也清除拒绝原因 --- - conversation_info.last_reply_rejection_reason = None - conversation_info.last_rejected_reply_content = None # <-- 新增清空内容 - logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") - # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action + # **注意**: last_successful_reply_action 的更新逻辑已经移到各自的动作处理中 + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'") + else: + # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action 的 final_reason + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败") - async def _send_reply(self): - """发送回复""" + # --- [修改点 10] _send_reply 返回布尔值表示成功与否 --- + async def _send_reply(self) -> bool: + """发送回复,并返回是否发送成功""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return + return False # 发送失败 try: - _current_time = time.time() reply_content = self.generated_reply - # 发送消息 (确保 direct_sender 和 chat_stream 有效) + # 检查依赖项 if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return + return False # 发送失败 if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return + return False # 发送失败 + # 发送消息 await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) - # 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息 - # 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持) - # 暂时注释掉,观察是否影响 ObservationInfo 的更新 + # 发送成功后,可以考虑触发 observer 更新,但需谨慎避免竞争条件或重复处理 + # 暂时注释掉,依赖 observer 的自然更新周期 # self.chat_observer.trigger_update() - # if not await self.chat_observer.wait_for_update(): - # logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时") + # await self.chat_observer.wait_for_update() - self.state = ConversationState.ANALYZING # 更新状态 + self.state = ConversationState.ANALYZING # 更新状态 (例如,可以改为 IDLE 或 WAITING) + return True # 发送成功 except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - self.state = ConversationState.ANALYZING + self.state = ConversationState.ANALYZING # 或者设置为 ERROR 状态? + return False # 发送失败 + async def _send_timeout_message(self): - """发送超时结束消息""" + """发送超时结束消息 (保持不变)""" try: messages = self.chat_observer.get_cached_messages(limit=1) if not messages: @@ -715,4 +725,4 @@ class Conversation: chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message ) except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") \ No newline at end of file diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index c7572955..35c63741 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# File: observation_info.py from typing import List, Optional, Dict, Any, Set from maim_message import UserInfo import time @@ -95,8 +97,11 @@ class ObservationInfoHandler(NotificationHandler): self.observation_info.unprocessed_messages = [ msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id ] - if len(self.observation_info.unprocessed_messages) < original_count: - logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})") + # --- [修改点 11] 更新 new_messages_count --- + self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages) + if self.observation_info.new_messages_count < original_count: + logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}") + elif notification_type == NotificationType.USER_JOINED: # 处理用户加入通知 (如果适用私聊场景) @@ -168,7 +173,7 @@ class ObservationInfo: self.last_message_id: Optional[str] = None self.last_message_content: str = "" self.last_message_sender: Optional[str] = None - self.bot_id: Optional[str] = None + self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id,例如从 global_config 获取 self.chat_history_count: int = 0 self.new_messages_count: int = 0 self.cold_chat_start_time: Optional[float] = None @@ -184,44 +189,43 @@ class ObservationInfo: self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name) - def bind_to_chat_observer(self, chat_observer: ChatObserver): - """绑定到指定的chat_observer + # --- 初始化 bot_id --- + from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题 + self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None - Args: - chat_observer: 要绑定的 ChatObserver 实例 - """ + def bind_to_chat_observer(self, chat_observer: ChatObserver): + """绑定到指定的chat_observer (保持不变)""" if self.chat_observer: logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver") return self.chat_observer = chat_observer try: - if not self.handler: # 确保 handler 已经被创建 + if not self.handler: logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!") - self.chat_observer = None # 重置,防止后续错误 + self.chat_observer = None return - # 注册关心的通知类型 self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler ) self.chat_observer.notification_manager.register_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # 可以根据需要注册更多通知类型 - # self.chat_observer.notification_manager.register_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- [修改点 12] 注册 MESSAGE_DELETED --- + self.chat_observer.notification_manager.register_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver") except Exception as e: logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}") - self.chat_observer = None # 绑定失败,重置 + self.chat_observer = None def unbind_from_chat_observer(self): - """解除与chat_observer的绑定""" + """解除与chat_observer的绑定 (保持不变)""" if ( self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler - ): # 增加 handler 检查 + ): try: self.chat_observer.notification_manager.unregister_handler( target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler @@ -229,161 +233,152 @@ class ObservationInfo: self.chat_observer.notification_manager.unregister_handler( target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler ) - # 如果注册了其他类型,也要在这里注销 - # self.chat_observer.notification_manager.unregister_handler( - # target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler - # ) + # --- [修改点 13] 注销 MESSAGE_DELETED --- + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler + ) logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑") except Exception as e: logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}") - finally: # 确保 chat_observer 被重置 + finally: self.chat_observer = None else: logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置") - # 修改:update_from_message 接收 UserInfo 对象 async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): - """从消息更新信息 - - Args: - message: 消息数据字典 - user_info: 解析后的 UserInfo 对象 (可能为 None) - """ + """从消息更新信息 (保持不变)""" message_time = message.get("time") message_id = message.get("message_id") processed_text = message.get("processed_plain_text", "") - # 只有在新消息到达时才更新 last_message 相关信息 if message_time and message_time > (self.last_message_time or 0): self.last_message_time = message_time self.last_message_id = message_id self.last_message_content = processed_text - # 重置冷场计时器 self.is_cold_chat = False self.cold_chat_start_time = None self.cold_chat_duration = 0.0 if user_info: - sender_id = str(user_info.user_id) # 确保是字符串 + sender_id = str(user_info.user_id) self.last_message_sender = sender_id - # 更新发言时间 if sender_id == self.bot_id: self.last_bot_speak_time = message_time else: self.last_user_speak_time = message_time - self.active_users.add(sender_id) # 用户发言则认为其活跃 + self.active_users.add(sender_id) else: logger.warning( f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}" ) - self.last_message_sender = None # 发送者未知 + self.last_message_sender = None - # 将原始消息字典添加到未处理列表 - self.unprocessed_messages.append(message) - self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度 + # --- [修改点 14] 添加到未处理列表,并更新计数 --- + # 检查消息是否已存在于未处理列表中,避免重复添加 + if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): + self.unprocessed_messages.append(message) + self.new_messages_count = len(self.unprocessed_messages) + logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}") + self.update_changed() + else: + logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}") - # logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}") - self.update_changed() # 标记状态已改变 else: - # 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告 pass - # logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}") + def update_changed(self): - """标记状态已改变,并重置标记""" - # logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)") + """标记状态已改变,并重置标记 (保持不变)""" self.changed = True async def update_cold_chat_status(self, is_cold: bool, current_time: float): - """更新冷场状态 - - Args: - is_cold: 是否处于冷场状态 - current_time: 当前时间戳 - """ - if is_cold != self.is_cold_chat: # 仅在状态变化时更新 + """更新冷场状态 (保持不变)""" + if is_cold != self.is_cold_chat: self.is_cold_chat = is_cold if is_cold: - # 进入冷场状态 self.cold_chat_start_time = ( self.last_message_time or current_time - ) # 从最后消息时间开始算,或从当前时间开始 + ) logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}") else: - # 结束冷场状态 if self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒") - self.cold_chat_start_time = None # 重置开始时间 - self.update_changed() # 状态变化,标记改变 + self.cold_chat_start_time = None + self.update_changed() - # 即使状态没变,如果是冷场状态,也更新持续时间 if self.is_cold_chat and self.cold_chat_start_time: self.cold_chat_duration = current_time - self.cold_chat_start_time def get_active_duration(self) -> float: - """获取当前活跃时长 (距离最后一条消息的时间) - - Returns: - float: 最后一条消息到现在的时长(秒) - """ + """获取当前活跃时长 (保持不变)""" if not self.last_message_time: return 0.0 return time.time() - self.last_message_time def get_user_response_time(self) -> Optional[float]: - """获取用户最后响应时间 (距离用户最后发言的时间) - - Returns: - Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None - """ + """获取用户最后响应时间 (保持不变)""" if not self.last_user_speak_time: return None return time.time() - self.last_user_speak_time def get_bot_response_time(self) -> Optional[float]: - """获取机器人最后响应时间 (距离机器人最后发言的时间) - - Returns: - Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None - """ + """获取机器人最后响应时间 (保持不变)""" if not self.last_bot_speak_time: return None return time.time() - self.last_bot_speak_time - async def clear_unprocessed_messages(self): - """将未处理消息移入历史记录,并更新相关状态""" - if not self.unprocessed_messages: - return # 没有未处理消息,直接返回 + # --- [修改点 15] 重命名并修改 clear_unprocessed_messages --- + # async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除 + async def clear_processed_messages(self, message_ids_to_clear: Set[str]): + """将指定ID的未处理消息移入历史记录,并更新相关状态""" + if not message_ids_to_clear: + logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。") + return - # logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...") - # 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长) - max_history_len = 100 # 示例:最多保留100条历史记录 - self.chat_history.extend(self.unprocessed_messages) + messages_to_move = [] + remaining_messages = [] + cleared_count = 0 + + # 分离要清理和要保留的消息 + for msg in self.unprocessed_messages: + if msg.get("message_id") in message_ids_to_clear: + messages_to_move.append(msg) + cleared_count += 1 + else: + remaining_messages.append(msg) + + if not messages_to_move: + logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。") + return + + logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...") + + # 将要移动的消息添加到历史记录 + max_history_len = 100 + self.chat_history.extend(messages_to_move) if len(self.chat_history) > max_history_len: self.chat_history = self.chat_history[-max_history_len:] - # 更新历史记录字符串 (只使用最近一部分生成,例如20条) - history_slice_for_str = self.chat_history[-20:] + # 更新历史记录字符串 (仅使用最近一部分生成) + history_slice_for_str = self.chat_history[-20:] # 例如最近20条 try: self.chat_history_str = await build_readable_messages( history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", - read_mark=0.0, # read_mark 可能需要根据逻辑调整 + read_mark=0.0, ) except Exception as e: logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}") - self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示 + self.chat_history_str = "[构建聊天记录出错]" - # 清空未处理消息列表和计数 - # cleared_count = len(self.unprocessed_messages) - self.unprocessed_messages.clear() - self.new_messages_count = 0 - # self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断 + # 更新未处理消息列表和计数 + self.unprocessed_messages = remaining_messages + self.new_messages_count = len(self.unprocessed_messages) + self.chat_history_count = len(self.chat_history) - self.chat_history_count = len(self.chat_history) # 更新历史记录总数 - # logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。") + logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") - self.update_changed() # 状态改变 + self.update_changed() # 状态改变 \ No newline at end of file