回复逻辑优化

pull/937/head
Bakadax 2025-05-06 09:42:37 +08:00
parent e294253b1a
commit 50c22b422e
3 changed files with 845 additions and 1071 deletions

View File

@ -1,24 +1,25 @@
# -*- coding: utf-8 -*-
# File: action_planner.py
import time
from typing import Tuple, Optional
from .pfc_utils import retrieve_contextual_info
import traceback
from typing import Tuple, Optional, Dict, Any, List
# import jieba # 如果需要旧版知识库的回退,可能需要
# import re # 如果需要旧版知识库的回退,可能需要
from src.common.logger_manager import get_logger
from src.individuality.individuality import Individuality
from src.plugins.utils.chat_message_builder import build_readable_messages
from ..models.utils_model import LLMRequest
from ...config.config import global_config
# 确保导入路径正确
from .pfc_utils import get_items_from_json, retrieve_contextual_info
from .chat_observer import ChatObserver
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from src.plugins.utils.chat_message_builder import build_readable_messages
logger = get_logger("pfc_action_planner")
# --- 定义 Prompt 模板 ---
# --- 定义 Prompt 模板 ---
# Prompt(1): 首次回复或非连续回复时的决策 Prompt
PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊请根据以下【所有信息】审慎且灵活的决策下一步行动可以回复可以倾听可以调取知识甚至可以屏蔽对方
@ -41,7 +42,7 @@ PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊
------
可选行动类型以及解释
listening: 倾听对方发言当你认为对方话才说到一半发言明显未结束时选择
direct_reply: 直接回复对方
direct_reply: 直接回复对方 (当有新消息需要处理时通常应选择此项)
rethink_goal: 思考一个对话目标当你觉得目前对话需要目标或当前目标不再适用或话题卡住时选择注意私聊的环境是灵活的有可能需要经常选择
end_conversation: 结束对话对方长时间没回复或者当你觉得对话告一段落时可以选择
block_and_ignore: 更加极端的结束对话方式直接结束对话并在一段时间内无视对方所有发言屏蔽当对话让你感到十分不适或你遭到各类骚扰时选择
@ -73,7 +74,7 @@ PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊刚刚
{retrieved_memory_str}
------
可选行动类型以及解释
wait: 暂时不说话留给对方交互空间等待对方回复尤其是在你刚发言后或上次发言因重复发言过多被拒时或不确定做什么时这是不错的选择
wait: 暂时不说话留给对方交互空间等待对方回复尤其是在你刚发言后或上次发言因重复发言过多被拒时或不确定做什么时这是不错的选择**重要仅当没有未读消息时才能选择此项**
listening: 倾听对方发言虽然你刚发过言但如果对方立刻回复且明显话没说完可以选择这个
send_new_message: 发送一条新消息继续对话允许适当的追问补充深入话题或开启相关新话题**但是避免在因重复被拒后立即使用也不要在对方没有回复的情况下过多的消息轰炸或重复发言**
rethink_goal: 思考一个对话目标当你觉得目前对话需要目标或当前目标不再适用或话题卡住时选择注意私聊的环境是灵活的有可能需要经常选择
@ -107,274 +108,114 @@ PROMPT_END_DECISION = """{persona_text}。刚刚你决定结束一场 QQ 私聊
注意请严格按照 JSON 格式输出不要包含任何其他内容"""
# ActionPlanner 类定义,顶格
class ActionPlanner:
"""行动规划器"""
def __init__(self, stream_id: str, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_PFC_action_planner,
temperature=global_config.llm_PFC_action_planner["temp"],
max_tokens=1500,
request_type="action_planning",
)
"""初始化行动规划器"""
self.stream_id = stream_id
self.private_name = private_name
# 初始化 LLM 请求对象
try:
llm_config = global_config.llm_PFC_action_planner
if not isinstance(llm_config, dict):
raise TypeError(f"LLM config 'llm_PFC_action_planner' is not a dictionary: {llm_config}")
self.llm = LLMRequest(
model=llm_config,
temperature=llm_config.get("temp", 0.7),
max_tokens=1500,
request_type="action_planning",
)
except TypeError as e:
logger.error(f"[私聊][{self.private_name}] 初始化 LLMRequest 时配置错误: {e}")
raise
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 初始化 LLMRequest 时发生未知错误: {e}")
raise
# 获取个性化信息和机器人名称
self.personality_info = Individuality.get_instance().get_prompt(x_person=2, level=3)
self.name = global_config.BOT_NICKNAME
self.private_name = private_name
# 获取 ChatObserver 实例 (单例模式)
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
# 修改 plan 方法签名,增加 last_successful_reply_action 参数
async def plan(
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
last_successful_reply_action: Optional[str],
) -> Tuple[str, str]:
"""规划下一步行动
"""
规划下一步行动
Args:
observation_info: 决策信息
conversation_info: 对话信息
last_successful_reply_action: 上一次成功的回复动作类型 ('direct_reply' 'send_new_message' None)
observation_info: 观察信息包含聊天记录未读消息等
conversation_info: 对话信息包含目标历史动作等
last_successful_reply_action: 上一次成功的回复动作类型 ('direct_reply' 'send_new_message' None)
Returns:
Tuple[str, str]: (行动类型, 行动原因)
Tuple[str, str]: (规划的行动类型, 行动原因)
"""
# --- 获取 Bot 上次发言时间信息 ---
# (这部分逻辑不变)
time_since_last_bot_message_info = ""
logger.info(f"[私聊][{self.private_name}] 开始规划行动...")
plan_start_time = time.time()
# --- 1. 准备 Prompt 输入信息 ---
try:
bot_id = str(global_config.BOT_QQ)
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
for i in range(len(observation_info.chat_history) - 1, -1, -1):
msg = observation_info.chat_history[i]
if not isinstance(msg, dict):
continue
sender_info = msg.get("user_info", {})
sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None
msg_time = msg.get("time")
if sender_id == bot_id and msg_time:
time_diff = time.time() - msg_time
if time_diff < 60.0:
time_since_last_bot_message_info = (
f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n"
)
break
else:
logger.debug(
f"[私聊][{self.private_name}]Observation info chat history is empty or not available for bot time check."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo object might not have chat_history attribute yet for bot time check."
time_since_last_bot_message_info = self._get_bot_last_speak_time_info(observation_info)
timeout_context = self._get_timeout_context(conversation_info)
goals_str = self._build_goals_string(conversation_info)
chat_history_text = await self._build_chat_history_text(observation_info)
persona_text = f"你的名字是{self.name}{self.personality_info}"
action_history_summary, last_action_context = self._build_action_history_context(conversation_info)
retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(
chat_history_text, self.private_name
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]获取 Bot 上次发言时间时出错: {e}")
# --- 获取超时提示信息 ---
# (这部分逻辑不变)
timeout_context = ""
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
last_goal_dict = conversation_info.goal_list[-1]
if isinstance(last_goal_dict, dict) and "goal" in last_goal_dict:
last_goal_text = last_goal_dict["goal"]
if isinstance(last_goal_text, str) and "分钟,思考接下来要做什么" in last_goal_text:
try:
timeout_minutes_text = last_goal_text.split("")[0].replace("你等待了", "")
timeout_context = f"重要提示:对方已经长时间({timeout_minutes_text})没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n"
except Exception:
timeout_context = "重要提示:对方已经长时间没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n"
else:
logger.debug(
f"[私聊][{self.private_name}]Conversation info goal_list is empty or not available for timeout check."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet for timeout check."
logger.info(
f"[私聊][{self.private_name}] (ActionPlanner) 检索完成。记忆: {'' if '回忆起' in retrieved_memory_str else ''} / 知识: {'' if retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str and '出错' not in retrieved_knowledge_str else ''}"
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]检查超时目标时出错: {e}")
except Exception as prep_err:
logger.error(f"[私聊][{self.private_name}] 准备 Prompt 输入时出错: {prep_err}")
logger.error(traceback.format_exc())
return "wait", f"准备行动规划输入时出错: {prep_err}"
# --- 构建通用 Prompt 参数 ---
logger.debug(
f"[私聊][{self.private_name}]开始规划行动:当前目标: {getattr(conversation_info, 'goal_list', '不可用')}"
)
# 构建对话目标 (goals_str)
goals_str = ""
# --- 2. 选择并格式化 Prompt ---
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
if isinstance(goal_reason, dict):
goal = goal_reason.get("goal", "目标内容缺失")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
goal = str(goal_reason)
reasoning = "没有明确原因"
goal = str(goal) if goal is not None else "目标内容缺失"
reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n"
if not goals_str:
goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
if last_successful_reply_action in ["direct_reply", "send_new_message"]:
prompt_template = PROMPT_FOLLOW_UP
log_msg = "使用 PROMPT_FOLLOW_UP (追问决策)"
else:
goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet."
prompt_template = PROMPT_INITIAL_REPLY
log_msg = "使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)"
logger.debug(f"[私聊][{self.private_name}] {log_msg}")
prompt = prompt_template.format(
persona_text=persona_text,
goals_str=goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。",
action_history_summary=action_history_summary,
last_action_context=last_action_context,
time_since_last_bot_message_info=time_since_last_bot_message_info,
timeout_context=timeout_context,
chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。",
retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。",
retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。",
)
goals_str = "- 获取对话目标时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建对话目标字符串时出错: {e}")
goals_str = "- 构建对话目标时出错。\n"
logger.debug(f"[私聊][{self.private_name}] 发送到LLM的最终提示词:\n------\n{prompt}\n------")
except KeyError as fmt_key_err:
logger.error(f"[私聊][{self.private_name}] 格式化 Prompt 时缺少键: {fmt_key_err}")
return "wait", f"格式化 Prompt 时出错 (缺少键: {fmt_key_err})"
except Exception as fmt_err:
logger.error(f"[私聊][{self.private_name}] 格式化 Prompt 时发生未知错误: {fmt_err}")
return "wait", f"格式化 Prompt 时出错: {fmt_err}"
# 获取聊天历史记录 (chat_history_text)
try:
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
chat_history_text = observation_info.chat_history_str
if not chat_history_text:
chat_history_text = "还没有聊天记录。\n"
else:
chat_history_text = "还没有聊天记录。\n"
if hasattr(observation_info, "new_messages_count") and observation_info.new_messages_count > 0:
if hasattr(observation_info, "unprocessed_messages") and observation_info.unprocessed_messages:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += (
f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
)
else:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo has new_messages_count > 0 but unprocessed_messages is empty or missing."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo object might be missing expected attributes for chat history."
)
chat_history_text = "获取聊天记录时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]处理聊天记录时发生未知错误: {e}")
chat_history_text = "处理聊天记录时出错。\n"
# 构建 Persona 文本 (persona_text)
persona_text = f"你的名字是{self.name}{self.personality_info}"
# 构建行动历史和上一次行动结果 (action_history_summary, last_action_context)
# (这部分逻辑不变)
action_history_summary = "你最近执行的行动历史:\n"
last_action_context = "关于你【上一次尝试】的行动:\n"
action_history_list = []
try:
if hasattr(conversation_info, "done_action") and conversation_info.done_action:
action_history_list = conversation_info.done_action[-5:]
else:
logger.debug(f"[私聊][{self.private_name}]Conversation info done_action is empty or not available.")
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have done_action attribute yet."
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]访问行动历史时出错: {e}")
if not action_history_list:
action_history_summary += "- 还没有执行过行动。\n"
last_action_context += "- 这是你规划的第一个行动。\n"
else:
for i, action_data in enumerate(action_history_list):
action_type = "未知"
plan_reason = "未知"
status = "未知"
final_reason = ""
action_time = ""
if isinstance(action_data, dict):
action_type = action_data.get("action", "未知")
plan_reason = action_data.get("plan_reason", "未知规划原因")
status = action_data.get("status", "未知")
final_reason = action_data.get("final_reason", "")
action_time = action_data.get("time", "")
elif isinstance(action_data, tuple):
# 假设旧格式兼容
if len(action_data) > 0:
action_type = action_data[0]
if len(action_data) > 1:
plan_reason = action_data[1] # 可能是规划原因或最终原因
if len(action_data) > 2:
status = action_data[2]
if status == "recall" and len(action_data) > 3:
final_reason = action_data[3]
elif status == "done" and action_type in ["direct_reply", "send_new_message"]:
plan_reason = "成功发送" # 简化显示
reason_text = f", 失败/取消原因: {final_reason}" if final_reason else ""
summary_line = f"- 时间:{action_time}, 尝试行动:'{action_type}', 状态:{status}{reason_text}"
action_history_summary += summary_line + "\n"
if i == len(action_history_list) - 1:
last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n"
last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n"
if status == "done":
last_action_context += "- 该行动已【成功执行】。\n"
# 记录这次成功的行动类型,供下次决策
# self.last_successful_action_type = action_type # 不在这里记录,由 conversation 控制
elif status == "recall":
last_action_context += "- 但该行动最终【未能执行/被取消】。\n"
if final_reason:
last_action_context += f"- 【重要】失败/取消的具体原因是: “{final_reason}\n"
else:
last_action_context += "- 【重要】失败/取消原因未明确记录。\n"
# self.last_successful_action_type = None # 行动失败,清除记录
else:
last_action_context += f"- 该行动当前状态: {status}\n"
# self.last_successful_action_type = None # 非完成状态,清除记录
retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(
chat_history_text, self.private_name
)
# Optional: 可以加一行日志确认结果,方便调试
logger.info(
f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'' if '回忆起' in retrieved_memory_str_planner else ''} / 知识: {'' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else ''}"
)
# --- 选择 Prompt ---
if last_successful_reply_action in ["direct_reply", "send_new_message"]:
prompt_template = PROMPT_FOLLOW_UP
logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_FOLLOW_UP (追问决策)")
else:
prompt_template = PROMPT_INITIAL_REPLY
logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)")
# --- 格式化最终的 Prompt ---
prompt = prompt_template.format(
persona_text=persona_text,
goals_str=goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。",
action_history_summary=action_history_summary,
last_action_context=last_action_context,
time_since_last_bot_message_info=time_since_last_bot_message_info,
timeout_context=timeout_context,
chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。",
# knowledge_info_str=knowledge_info_str, # 移除了旧知识展示方式
retrieved_memory_str=retrieved_memory_str_planner if retrieved_memory_str_planner else "无相关记忆。",
retrieved_knowledge_str=retrieved_knowledge_str_planner
if retrieved_knowledge_str_planner
else "无相关知识。",
)
logger.debug(f"[私聊][{self.private_name}]发送到LLM的最终提示词:\n------\n{prompt}\n------")
# --- 3. 调用 LLM 进行初步规划 ---
try:
llm_start_time = time.time()
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"[私聊][{self.private_name}]LLM (行动规划) 原始返回内容: {content}")
llm_duration = time.time() - llm_start_time
logger.debug(f"[私聊][{self.private_name}] LLM (行动规划) 耗时: {llm_duration:.3f} 秒, 原始返回: {content}")
# --- 初始行动规划解析 ---
success, initial_result = get_items_from_json(
content,
self.private_name,
@ -385,87 +226,190 @@ class ActionPlanner:
initial_action = initial_result.get("action", "wait")
initial_reason = initial_result.get("reason", "LLM未提供原因默认等待")
logger.info(f"[私聊][{self.private_name}] LLM 初步规划行动: {initial_action}, 原因: {initial_reason}")
except Exception as llm_err:
logger.error(f"[私聊][{self.private_name}] 调用 LLM 或解析初步规划结果时出错: {llm_err}")
logger.error(traceback.format_exc())
return "wait", f"行动规划 LLM 调用或解析出错: {llm_err}"
# 检查是否需要进行结束对话决策 ---
if initial_action == "end_conversation":
logger.info(f"[私聊][{self.private_name}]初步规划结束对话,进入告别决策...")
# --- 4. 处理特殊动作 (end_conversation) ---
final_action = initial_action
final_reason = initial_reason
# 使用新的 PROMPT_END_DECISION
end_decision_prompt = PROMPT_END_DECISION.format(
persona_text=persona_text, # 复用之前的 persona_text
chat_history_text=chat_history_text, # 复用之前的 chat_history_text
if initial_action == "end_conversation":
try:
final_action, final_reason = await self._handle_end_conversation_decision(
persona_text, chat_history_text, initial_reason
)
except Exception as end_dec_err:
logger.error(f"[私聊][{self.private_name}] 处理结束对话决策时出错: {end_dec_err}")
logger.warning(f"[私聊][{self.private_name}] 结束决策出错,将按原计划执行 end_conversation")
final_action = "end_conversation" # 保持原计划
final_reason = initial_reason
logger.debug(
f"[私聊][{self.private_name}]发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------"
)
try:
end_content, _ = await self.llm.generate_response_async(end_decision_prompt) # 再次调用LLM
logger.debug(f"[私聊][{self.private_name}]LLM (结束决策) 原始返回内容: {end_content}")
# --- [移除] 不再需要在这里检查 wait 动作的约束 ---
# elif initial_action == "wait":
# # ... (移除之前的检查逻辑) ...
# final_action = "wait"
# final_reason = initial_reason
# 解析结束决策的JSON
end_success, end_result = get_items_from_json(
end_content,
self.private_name,
"say_bye",
"reason",
default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误默认不告别"},
required_types={"say_bye": str, "reason": str}, # 明确类型
)
# --- 5. 验证最终行动类型 ---
valid_actions = [
"direct_reply", "send_new_message", "wait", "listening",
"rethink_goal", "end_conversation", "block_and_ignore", "say_goodbye"
]
if final_action not in valid_actions:
logger.warning(f"[私聊][{self.private_name}] LLM 返回了未知的行动类型: '{final_action}',强制改为 wait")
final_reason = f"(原始行动'{final_action}'无效已强制改为wait) {final_reason}"
final_action = "wait" # 遇到无效动作,默认等待
say_bye_decision = end_result.get("say_bye", "no").lower() # 转小写方便比较
end_decision_reason = end_result.get("reason", "未提供原因")
plan_duration = time.time() - plan_start_time
logger.success(f"[私聊][{self.private_name}] 最终规划行动: {final_action} (总耗时: {plan_duration:.3f} 秒)")
logger.info(f"[私聊][{self.private_name}] 行动原因: {final_reason}")
return final_action, final_reason
if end_success and say_bye_decision == "yes":
# 决定要告别,返回新的 'say_goodbye' 动作
logger.info(
f"[私聊][{self.private_name}]结束决策: yes, 准备生成告别语. 原因: {end_decision_reason}"
)
# 注意:这里的 reason 可以考虑拼接初始原因和结束决策原因,或者只用结束决策原因
final_action = "say_goodbye"
final_reason = f"决定发送告别语。决策原因: {end_decision_reason} (原结束理由: {initial_reason})"
return final_action, final_reason
else:
# 决定不告别 (包括解析失败或明确说no)
logger.info(
f"[私聊][{self.private_name}]结束决策: no, 直接结束对话. 原因: {end_decision_reason}"
)
# 返回原始的 'end_conversation' 动作
final_action = "end_conversation"
final_reason = initial_reason # 保持原始的结束理由
return final_action, final_reason
except Exception as end_e:
logger.error(f"[私聊][{self.private_name}]调用结束决策LLM或处理结果时出错: {str(end_e)}")
# 出错时,默认执行原始的结束对话
logger.warning(f"[私聊][{self.private_name}]结束决策出错,将按原计划执行 end_conversation")
return "end_conversation", initial_reason # 返回原始动作和原因
# --- Helper methods for preparing prompt inputs ---
else:
action = initial_action
reason = initial_reason
def _get_bot_last_speak_time_info(self, observation_info: ObservationInfo) -> str:
"""获取机器人上次发言时间提示"""
time_info = ""
try:
if not observation_info or not observation_info.bot_id: return ""
bot_id_str = str(observation_info.bot_id)
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
for msg in reversed(observation_info.chat_history):
if not isinstance(msg, dict): continue
sender_info = msg.get("user_info", {})
sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None
msg_time = msg.get("time")
if sender_id == bot_id_str and msg_time:
time_diff = time.time() - msg_time
if time_diff < 60.0:
time_info = f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n"
break
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}")
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}")
return time_info
# 验证action类型 (保持不变)
valid_actions = [
"direct_reply",
"send_new_message",
"wait",
"listening",
"rethink_goal",
"end_conversation", # 仍然需要验证,因为可能从上面决策后返回
"block_and_ignore",
"say_goodbye", # 也要验证这个新动作
]
if action not in valid_actions:
logger.warning(f"[私聊][{self.private_name}]LLM返回了未知的行动类型: '{action}',强制改为 wait")
reason = f"(原始行动'{action}'无效已强制改为wait) {reason}"
action = "wait"
def _get_timeout_context(self, conversation_info: ConversationInfo) -> str:
"""获取超时提示信息"""
timeout_context = ""
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
last_goal_item = conversation_info.goal_list[-1]
last_goal_text = ""
if isinstance(last_goal_item, dict): last_goal_text = last_goal_item.get("goal", "")
elif isinstance(last_goal_item, str): last_goal_text = last_goal_item
if isinstance(last_goal_text, str) and "分钟," in last_goal_text and "思考接下来要做什么" in last_goal_text:
wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了","").strip()
timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n"
logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}")
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}")
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}")
return timeout_context
logger.info(f"[私聊][{self.private_name}]规划的行动: {action}")
logger.info(f"[私聊][{self.private_name}]行动原因: {reason}")
return action, reason
def _build_goals_string(self, conversation_info: ConversationInfo) -> str:
"""构建对话目标字符串"""
goals_str = ""
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
recent_goals = conversation_info.goal_list[-3:]
for goal_item in recent_goals:
goal = "目标内容缺失"; reasoning = "没有明确原因"
if isinstance(goal_item, dict):
goal = goal_item.get("goal", goal); reasoning = goal_item.get("reasoning", reasoning)
elif isinstance(goal_item, str): goal = goal_item
goal = str(goal) if goal is not None else "目标内容缺失"
reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n"
if not goals_str: goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
else: goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 构建对话目标字符串时属性错误: {e}"); goals_str = "- 获取对话目标时出错。\n"
except Exception as e: logger.error(f"[私聊][{self.private_name}] 构建对话目标字符串时出错: {e}"); goals_str = "- 构建对话目标时出错。\n"
return goals_str
except Exception as e:
# 外层异常处理保持不变
logger.error(f"[私聊][{self.private_name}]规划行动时调用 LLM 或处理结果出错: {str(e)}")
return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}"
async def _build_chat_history_text(self, observation_info: ObservationInfo) -> str:
"""构建聊天历史记录文本 (包含未处理消息)"""
chat_history_text = ""
try:
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: chat_history_text = observation_info.chat_history_str
elif hasattr(observation_info, "chat_history") and observation_info.chat_history:
history_slice = observation_info.chat_history[-20:]
chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
else: chat_history_text = "还没有聊天记录。\n"
unread_count = getattr(observation_info, 'new_messages_count', 0)
unread_messages = getattr(observation_info, 'unprocessed_messages', [])
if unread_count > 0 and unread_messages:
from ...config.config import global_config
bot_qq_str = str(global_config.BOT_QQ)
other_unread_messages = [msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str]
other_unread_count = len(other_unread_messages)
if other_unread_count > 0:
new_messages_str = await build_readable_messages(other_unread_messages, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
chat_history_text += f"\n--- 以下是 {other_unread_count} 条你需要处理的新消息 ---\n{new_messages_str}\n------\n"
logger.debug(f"[私聊][{self.private_name}] 向 LLM 追加了 {other_unread_count} 条未读消息。")
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 构建聊天记录文本时属性错误: {e}"); chat_history_text = "[获取聊天记录时出错]\n"
except Exception as e: logger.error(f"[私聊][{self.private_name}] 处理聊天记录时发生未知错误: {e}"); chat_history_text = "[处理聊天记录时出错]\n"
return chat_history_text
def _build_action_history_context(self, conversation_info: ConversationInfo) -> Tuple[str, str]:
"""构建行动历史概要和上一次行动详细情况"""
action_history_summary = "你最近执行的行动历史:\n"; last_action_context = "关于你【上一次尝试】的行动:\n"
action_history_list: List[Dict[str, Any]] = []
try:
if hasattr(conversation_info, "done_action") and conversation_info.done_action: action_history_list = conversation_info.done_action[-5:]
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取行动历史时属性错误: {e}")
except Exception as e: logger.error(f"[私聊][{self.private_name}] 访问行动历史时出错: {e}")
if not action_history_list:
action_history_summary += "- 还没有执行过行动。\n"; last_action_context += "- 这是你规划的第一个行动。\n"
else:
for i, action_data in enumerate(action_history_list):
if not isinstance(action_data, dict): logger.warning(f"[私聊][{self.private_name}] 行动历史记录格式错误,跳过: {action_data}"); continue
action_type = action_data.get("action", "未知动作"); plan_reason = action_data.get("plan_reason", "未知规划原因")
status = action_data.get("status", "未知状态"); final_reason = action_data.get("final_reason", "")
action_time = action_data.get("time", "未知时间")
reason_text = f", 最终原因: “{final_reason}" if final_reason else ""
summary_line = f"- 时间:{action_time}, 尝试:'{action_type}', 状态:{status}{reason_text}"
action_history_summary += summary_line + "\n"
if i == len(action_history_list) - 1:
last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n"
last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n"
if status == "done": last_action_context += "- 该行动已【成功执行】。\n"
elif status == "recall" or status == "error" or status.startswith("cancelled"):
last_action_context += "- 但该行动最终【未能成功执行/被取消/出错】。\n"
if final_reason: last_action_context += f"- 【重要】失败/取消/错误原因是: “{final_reason}\n"
else: last_action_context += "- 【重要】失败/取消/错误原因未明确记录。\n"
elif status == "start": last_action_context += "- 该行动【正在执行中】或【未完成】。\n"
else: last_action_context += f"- 该行动当前状态未知: {status}\n"
return action_history_summary, last_action_context
# --- Helper method for handling end_conversation decision ---
async def _handle_end_conversation_decision(
self, persona_text: str, chat_history_text: str, initial_reason: str
) -> Tuple[str, str]:
"""处理结束对话前的告别决策"""
logger.info(f"[私聊][{self.private_name}] 初步规划结束对话,进入告别决策...")
end_decision_prompt = PROMPT_END_DECISION.format(persona_text=persona_text, chat_history_text=chat_history_text)
logger.debug(f"[私聊][{self.private_name}] 发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------")
llm_start_time = time.time()
end_content, _ = await self.llm.generate_response_async(end_decision_prompt)
llm_duration = time.time() - llm_start_time
logger.debug(f"[私聊][{self.private_name}] LLM (结束决策) 耗时: {llm_duration:.3f} 秒, 原始返回: {end_content}")
end_success, end_result = get_items_from_json(end_content, self.private_name, "say_bye", "reason", default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误默认不告别"}, required_types={"say_bye": str, "reason": str})
say_bye_decision = end_result.get("say_bye", "no").lower()
end_decision_reason = end_result.get("reason", "未提供原因")
if end_success and say_bye_decision == "yes":
logger.info(f"[私聊][{self.private_name}] 结束决策: yes, 准备生成告别语. 原因: {end_decision_reason}")
final_action = "say_goodbye"; final_reason = f"决定发送告别语 (原因: {end_decision_reason})。原结束理由: {initial_reason}"
return final_action, final_reason
else:
logger.info(f"[私聊][{self.private_name}] 结束决策: no, 直接结束对话. 原因: {end_decision_reason}")
return "end_conversation", initial_reason

View File

@ -1,34 +1,32 @@
# -*- coding: utf-8 -*-
# File: conversation.py
import time
import asyncio
import datetime
import traceback
from typing import Dict, Any, Optional, Set, List
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from typing import Dict, Any, Optional, Set # <-- 添加 Set 类型提示
from ..chat.message import Message
from .pfc_types import ConversationState
from .pfc import ChatObserver, GoalAnalyzer
from .message_sender import DirectMessageSender
from src.common.logger_manager import get_logger
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from maim_message import UserInfo
from src.plugins.chat.chat_stream import chat_manager, ChatStream
from ..chat.message import Message # 假设 Message 类在这里
from ...config.config import global_config # 导入全局配置
from .pfc_types import ConversationState
from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py
from .chat_observer import ChatObserver
from .message_sender import DirectMessageSender
from .action_planner import ActionPlanner
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .conversation_info import ConversationInfo # 导入修改后的 ConversationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from .idle_conversation_starter import IdleConversationStarter
from maim_message import UserInfo
from src.plugins.chat.chat_stream import chat_manager
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里
from .waiter import Waiter
import traceback
from rich.traceback import install
install(extra_lines=3)
logger = get_logger("pfc")
logger = get_logger("pfc_conversation")
class Conversation:
"""对话类,负责管理单个对话的状态和行为"""
@ -41,9 +39,36 @@ class Conversation:
self.should_continue = False
self.ignore_until_timestamp: Optional[float] = None
self.generated_reply = ""
self.chat_stream: Optional[ChatStream] = None
# 初始化组件为 None
self.action_planner: Optional[ActionPlanner] = None
self.goal_analyzer: Optional[GoalAnalyzer] = None
self.reply_generator: Optional[ReplyGenerator] = None
self.knowledge_fetcher: Optional[KnowledgeFetcher] = None
self.waiter: Optional[Waiter] = None
self.direct_sender: Optional[DirectMessageSender] = None
self.idle_conversation_starter: Optional[IdleConversationStarter] = None
self.chat_observer: Optional[ChatObserver] = None
self.observation_info: Optional[ObservationInfo] = None
self.conversation_info: Optional[ConversationInfo] = None # 使用 ConversationInfo
self._initializing = False
self._initialized = False
# 在初始化时获取机器人QQ号字符串避免重复转换
self.bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
if not self.bot_qq_str:
logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ IDPFC 可能无法正常工作。")
async def _initialize(self):
"""初始化实例,注册所有组件 (保持不变)"""
"""异步初始化对话实例及其所有组件"""
if self._initialized or self._initializing:
logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。")
return
self._initializing = True
logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}")
try:
self.action_planner = ActionPlanner(self.stream_id, self.private_name)
self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name)
@ -52,201 +77,199 @@ class Conversation:
self.waiter = Waiter(self.stream_id, self.private_name)
self.direct_sender = DirectMessageSender(self.private_name)
self.chat_stream = chat_manager.get_stream(self.stream_id)
if not self.chat_stream:
raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream")
self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name)
self.stop_action_planner = False
except Exception as e:
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册运行组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
raise
try:
self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name)
self.chat_observer.start()
self.observation_info = ObservationInfo(self.private_name)
self.observation_info.bind_to_chat_observer(self.chat_observer)
if not self.observation_info.bot_id:
logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id尝试手动设置。")
self.observation_info.bot_id = self.bot_qq_str
self.conversation_info = ConversationInfo()
self.observation_info.bind_to_chat_observer(self.chat_observer)
await self._load_initial_history()
self.chat_observer.start()
if self.idle_conversation_starter:
self.idle_conversation_starter.start()
logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动")
self._initialized = True
self.should_continue = True
self.state = ConversationState.ANALYZING
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
self.should_continue = False
await self.stop()
raise
finally:
self._initializing = False
async def _load_initial_history(self):
"""加载初始聊天记录"""
if not self.observation_info: return
try:
logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...")
logger.info(f"[私聊][{self.private_name}] {self.stream_id} 加载初始聊天记录...")
initial_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=30,
)
chat_talking_prompt = await build_readable_messages(
initial_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
chat_id=self.stream_id, timestamp=time.time(), limit=30,
)
if initial_messages:
self.observation_info.chat_history = initial_messages
self.observation_info.chat_history_str = chat_talking_prompt + "\n"
self.observation_info.chat_history_count = len(initial_messages)
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get("time")
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
self.observation_info.last_message_sender = last_user_info.user_id
self.observation_info.last_message_id = last_msg.get("message_id")
last_user_info_dict = last_msg.get("user_info", {})
if isinstance(last_user_info_dict, dict):
try:
last_user_info = UserInfo.from_dict(last_user_info_dict)
self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None
except Exception as e:
logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}")
self.observation_info.last_message_sender = None
else: self.observation_info.last_message_sender = None
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
logger.info(
f"[私聊][{self.private_name}]成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
history_slice_for_str = initial_messages[-20:]
self.observation_info.chat_history_str = await build_readable_messages(
history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0
)
# --- 注意: 下面两行保持不变,但其健壮性依赖于 ChatObserver 的实现 ---
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg
await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time)
if self.chat_observer: self.chat_observer.last_message_time = self.observation_info.last_message_time
if self.idle_conversation_starter and self.observation_info.last_message_time:
await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time)
logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}")
else:
logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。")
logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。")
self.observation_info.chat_history_str = "还没有聊天记录。"
except Exception as load_err:
logger.error(f"[私聊][{self.private_name}]加载初始聊天记录时出错: {load_err}")
logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}")
if self.observation_info: self.observation_info.chat_history_str = "[加载聊天记录出错]"
self.should_continue = True
asyncio.create_task(self.start())
# 启动空闲对话检测器
self.idle_conversation_starter.start()
logger.info(f"[私聊][{self.private_name}]空闲对话检测器已启动")
asyncio.create_task(self.start())
async def start(self):
"""开始对话流程 (保持不变)"""
try:
logger.info(f"[私聊][{self.private_name}]对话系统启动中...")
asyncio.create_task(self._plan_and_action_loop())
except Exception as e:
logger.error(f"[私聊][{self.private_name}]启动对话系统失败: {e}")
raise
"""开始对话流程"""
if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 对话实例未初始化,无法启动。")
try:
await self._initialize()
if not self._initialized: return
except Exception: return
if not self.should_continue:
logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续,无法启动。")
return
logger.info(f"[私聊][{self.private_name}] 对话系统启动,开始规划循环...")
asyncio.create_task(self._plan_and_action_loop())
async def stop(self):
"""停止对话实例并清理资源"""
logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}")
self.should_continue = False
if self.idle_conversation_starter: self.idle_conversation_starter.stop()
if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer()
self._initialized = False
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。")
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
"""思考步PFC核心循环模块 - 实现精细化中断逻辑"""
if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环。")
return
while self.should_continue:
# 忽略逻辑 (保持不变)
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
# 暂停空闲对话检测器,避免在忽略期间触发
if hasattr(self, 'idle_conversation_starter') and self.idle_conversation_starter._running:
self.idle_conversation_starter.stop()
logger.debug(f"[私聊][{self.private_name}]对话被暂时忽略,暂停空闲对话检测")
await asyncio.sleep(30)
continue
elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp:
logger.info(f"[私聊][{self.private_name}]忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None
self.should_continue = False
current_loop_start_time = time.time()
# --- 忽略逻辑 ---
if self.ignore_until_timestamp and current_loop_start_time < self.ignore_until_timestamp:
if self.idle_conversation_starter and self.idle_conversation_starter._running:
self.idle_conversation_starter.stop(); logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测")
sleep_duration = min(30, self.ignore_until_timestamp - current_loop_start_time)
await asyncio.sleep(sleep_duration)
continue
elif self.ignore_until_timestamp and current_loop_start_time >= self.ignore_until_timestamp:
logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None; await self.stop(); continue
else:
# 确保空闲对话检测器在正常对话时是启动的
if hasattr(self, 'idle_conversation_starter') and not self.idle_conversation_starter._running:
self.idle_conversation_starter.start()
logger.debug(f"[私聊][{self.private_name}]恢复空闲对话检测")
if self.idle_conversation_starter and not self.idle_conversation_starter._running:
self.idle_conversation_starter.start(); logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测")
# --- 核心规划与行动逻辑 ---
try:
# --- [修改点 1] 在规划前记录未处理消息的 ID 集合 ---
message_ids_before_planning = set()
initial_unprocessed_message_count = 0
if hasattr(self.observation_info, "unprocessed_messages"):
message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")}
initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages)
logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}")
else:
logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning.")
# --- 调用 Action Planner (保持不变) ---
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
)
# --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 ---
current_unprocessed_messages = []
current_unprocessed_message_count = 0
if hasattr(self.observation_info, "unprocessed_messages"):
current_unprocessed_messages = self.observation_info.unprocessed_messages
current_unprocessed_message_count = len(current_unprocessed_messages)
else:
logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning.")
# 计算规划期间实际新增的消息数量
new_messages_during_planning_count = 0
if not all([self.action_planner, self.observation_info, self.conversation_info]):
logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。"); await asyncio.sleep(5); continue
# --- 1. 记录规划开始时间 ---
planning_start_time = time.time()
logger.debug(f"[私聊][{self.private_name}] --- 开始新一轮规划 ({planning_start_time:.2f}) ---")
self.conversation_info.other_new_messages_during_planning_count = 0
# --- 2. 调用 Action Planner ---
action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action)
planning_duration = time.time() - planning_start_time
logger.debug(f"[私聊][{self.private_name}] 规划耗时: {planning_duration:.3f} 秒,初步规划动作: {action}")
# --- 3. 检查规划期间的新消息 ---
current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', [])
new_messages_during_planning: List[Dict[str, Any]] = []
other_new_messages_during_planning: List[Dict[str, Any]] = []
for msg in current_unprocessed_messages:
msg_id = msg.get("message_id")
if msg_id and msg_id not in message_ids_before_planning:
new_messages_during_planning_count += 1
logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}")
# **核心逻辑:判断是否中断** (保持不变)
if new_messages_during_planning_count > 2:
logger.info(
f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划"
)
self.conversation_info.last_successful_reply_action = None
await asyncio.sleep(0.1)
continue # 跳过本轮后续处理,直接进入下一轮循环重新规划
# --- 执行动作 (移除 message_ids_before_planning 参数传递) ---
msg_time = msg.get('time')
sender_id = msg.get("user_info", {}).get("user_id")
if msg_time and msg_time >= planning_start_time:
new_messages_during_planning.append(msg)
if sender_id != self.bot_qq_str: other_new_messages_during_planning.append(msg)
new_msg_count = len(new_messages_during_planning); other_new_msg_count = len(other_new_messages_during_planning)
logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}")
# --- 4. 执行中断检查 ---
should_interrupt = False; interrupt_reason = ""
if action in ["wait", "listening"]:
if new_msg_count > 0: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}")
else:
interrupt_threshold = 2
if other_new_msg_count > interrupt_threshold: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {other_new_msg_count} 条来自他人的新消息 (阈值 >{interrupt_threshold})"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}")
if should_interrupt:
logger.info(f"[私聊][{self.private_name}] 执行中断,重新规划...")
cancel_record = {"action": action, "plan_reason": reason, "status": "cancelled_due_to_new_messages", "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": interrupt_reason}
if not hasattr(self.conversation_info, "done_action"): self.conversation_info.done_action = []
self.conversation_info.done_action.append(cancel_record)
self.conversation_info.last_successful_reply_action = None; self.state = ConversationState.ANALYZING; await asyncio.sleep(0.1); continue
# --- 5. 如果未中断,存储状态并执行动作 ---
logger.debug(f"[私聊][{self.private_name}] 未中断,继续执行动作 '{action}'")
self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# --- 检查是否需要结束对话 (逻辑保持不变) ---
# --- 6. 检查是否需要结束对话 ---
goal_ended = False
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
for goal_item in self.conversation_info.goal_list:
current_goal = None
if isinstance(goal_item, dict):
current_goal = goal_item.get("goal")
elif isinstance(goal_item, str):
current_goal = goal_item
if isinstance(current_goal, str) and current_goal == "结束对话":
goal_ended = True
break
if goal_ended:
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]检测到'结束对话'目标,停止循环。")
except Exception as loop_err:
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
await asyncio.sleep(1)
if self.should_continue:
await asyncio.sleep(0.1)
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
last_goal_item = self.conversation_info.goal_list[-1]; current_goal = None
if isinstance(last_goal_item, dict): current_goal = last_goal_item.get("goal")
elif isinstance(last_goal_item, str): current_goal = last_goal_item
if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True
last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {}
action_ended = last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("status") == "done"
if goal_ended or action_ended:
logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"); await self.stop(); continue
except asyncio.CancelledError: logger.info(f"[私聊][{self.private_name}] PFC 主循环被取消。"); await self.stop(); break
except Exception as loop_err: logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; await asyncio.sleep(5)
# 控制循环频率
loop_duration = time.time() - current_loop_start_time; min_loop_interval = 0.1
if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration)
logger.info(f"[私聊][{self.private_name}] PFC 循环结束 for stream_id: {self.stream_id}")
# --- 移除 _check_interrupt_before_sending 方法 ---
# def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool:
# ... (旧代码移除)
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]:
"""将消息字典转换为Message对象"""
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象 (保持不变)"""
try:
chat_info = msg_dict.get("chat_info")
if chat_info and isinstance(chat_info, dict):
chat_stream = ChatStream.from_dict(chat_info)
elif self.chat_stream:
chat_stream = self.chat_stream
else:
chat_stream = chat_manager.get_stream(self.stream_id)
if not chat_stream:
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id)
if not chat_stream_to_use: logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"); return None
user_info_dict = msg_dict.get("user_info", {}); user_info: Optional[UserInfo] = None
if isinstance(user_info_dict, dict):
try: user_info = UserInfo.from_dict(user_info_dict)
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}")
if not user_info: logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}"); return None
return Message(message_id=msg_dict.get("message_id", f"gen_{time.time()}"), chat_stream=chat_stream_to_use, time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", ""))
except Exception as e: logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}\n{traceback.format_exc()}"); return None
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict.get("message_id", f"gen_{time.time()}"),
chat_stream=chat_stream,
time=msg_dict.get("time", time.time()),
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
# --- [修改点 3] 修改 _handle_action 签名并调整内部逻辑 (移除 message_ids_before_planning 参数) ---
async def _handle_action(
self,
action: str,
@ -254,373 +277,196 @@ class Conversation:
observation_info: ObservationInfo,
conversation_info: ConversationInfo
):
"""处理规划的行动"""
logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}")
"""处理规划的行动 - 实现精细化后续状态设置"""
if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'")
return
# 记录action历史 (逻辑不变)
logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}")
action_start_time = time.time()
# 记录action历史
current_action_record = {
"action": action,
"plan_reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
"action": action, "plan_reason": reason, "status": "start",
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": None,
}
if not hasattr(conversation_info, "done_action"):
conversation_info.done_action = []
if not hasattr(conversation_info, "done_action"): conversation_info.done_action = []
conversation_info.done_action.append(current_action_record)
action_index = len(conversation_info.done_action) - 1
action_successful = False
reply_sent = False
action_successful = False # 初始化动作成功状态
final_status = "recall" # 默认失败状态
final_reason = "动作未成功执行" # 默认失败原因
# --- 根据不同的 action 执行 ---
if action == "direct_reply" or action == "send_new_message":
# 合并 reply 和 follow-up 的生成/检查逻辑 (保持不变)
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
logger.info(log_prefix)
try:
# --- 根据不同的 action 执行 ---
if action in ["direct_reply", "send_new_message", "say_goodbye"]:
# --- 生成回复逻辑 ---
self.state = ConversationState.GENERATING
if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化")
generated_content = await self.reply_generator.generate(observation_info, conversation_info, action_type=action)
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'")
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type=action
)
logger.info(f"{log_prefix} 生成内容: {self.generated_reply}")
self.state = ConversationState.CHECKING
try:
current_goal_str = ""
if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list:
goal_item = conversation_info.goal_list[0]
if isinstance(goal_item, dict):
current_goal_str = goal_item.get('goal', '')
elif isinstance(goal_item, str):
current_goal_str = goal_item
chat_history_for_check = getattr(observation_info, 'chat_history', [])
chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '')
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=chat_history_for_check,
chat_history_str=chat_history_str_for_check,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
if not is_suitable or need_replan:
conversation_info.last_reply_rejection_reason = check_reason
conversation_info.last_rejected_reply_content = self.generated_reply
else:
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(
f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}"
)
break
except Exception as check_err:
logger.error(
f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}"
conversation_info.last_rejected_reply_content = self.generated_reply
break
# --- 处理生成和检查的结果 ---
if is_suitable:
# --- [修改点 4] 记录发送前时间戳 ---
timestamp_before_sending = time.time()
logger.debug(f"[私聊][{self.private_name}]准备发送回复,记录发送前时间戳: {timestamp_before_sending}")
# 确认发送
self.generated_reply = final_reply_to_send
send_success = await self._send_reply() # 调用发送函数
await self.idle_conversation_starter.update_last_message_time()
if send_success:
action_successful = True
reply_sent = True
logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.")
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# --- [修改点 5] 基于时间戳处理消息和决定下一轮 prompt 类型 ---
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', [])
message_ids_to_clear: Set[str] = set() # 使用 Set 类型
new_messages_during_sending_count = 0
for msg in current_unprocessed_messages:
msg_time = msg.get('time')
msg_id = msg.get('message_id')
if msg_id and msg_time: # 确保时间和 ID 存在
if msg_time < timestamp_before_sending:
message_ids_to_clear.add(msg_id)
else:
# 时间戳大于等于发送前时间戳,视为新消息
new_messages_during_sending_count += 1
logger.debug(f"[私聊][{self.private_name}]回复发送后,检测到 {len(message_ids_to_clear)} 条发送前消息待清理,{new_messages_during_sending_count} 条发送期间/之后的新消息。")
# 清理发送前到达的消息
if message_ids_to_clear:
await observation_info.clear_processed_messages(message_ids_to_clear)
else:
logger.debug(f"[私聊][{self.private_name}]没有需要清理的发送前消息。")
# 根据发送期间是否有新消息,决定下次规划用哪个 prompt
if new_messages_during_sending_count > 0:
logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_sending_count} 条在发送期间/之后到达的新消息,下一轮将使用首次回复逻辑处理。")
self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY
else:
logger.info(f"[私聊][{self.private_name}]发送期间/之后无新消息,下一轮将根据 '{action}' 使用追问逻辑。")
self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP
else: # 发送失败
logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。")
action_successful = False
self.conversation_info.last_successful_reply_action = None
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "发送回复时失败"}
)
elif need_replan:
# 检查后打回动作决策 (保持不变)
logger.warning(
f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"}
)
self.conversation_info.last_successful_reply_action = None
else: # 多次尝试后仍然不合适 (保持不变)
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"}
)
self.conversation_info.last_successful_reply_action = None
if action == "send_new_message":
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
action_successful = True
self.conversation_info.last_successful_reply_action = None
# --- 处理其他动作 (保持不变,确保状态重置) ---
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
try:
if not hasattr(self, "goal_analyzer"):
raise AttributeError("GoalAnalyzer not initialized")
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
action_successful = True
except Exception as rethink_err:
logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
)
self.conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info(f"[私聊][{self.private_name}]倾听对方发言...")
try:
if not hasattr(self, "waiter"):
raise AttributeError("Waiter not initialized")
await self.waiter.wait_listening(conversation_info)
action_successful = True
except Exception as listen_err:
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
)
self.conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
elif action == "say_goodbye":
self.state = ConversationState.GENERATING
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
try:
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="say_goodbye"
)
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
if self.generated_reply:
# --- [修改点 6] 告别语发送前记录时间戳 ---
timestamp_before_sending_goodbye = time.time()
send_success = await self._send_reply()
if send_success:
action_successful = True
reply_sent = True
logger.info(f"[私聊][{self.private_name}]告别语已发送。")
# --- [修改点 7] 告别语发送后也处理未读消息 ---
# (虽然通常之后就结束了,但以防万一)
current_unprocessed_messages_goodbye = getattr(observation_info, 'unprocessed_messages', [])
message_ids_to_clear_goodbye: Set[str] = set()
for msg in current_unprocessed_messages_goodbye:
msg_time = msg.get('time')
msg_id = msg.get('message_id')
if msg_id and msg_time and msg_time < timestamp_before_sending_goodbye:
message_ids_to_clear_goodbye.add(msg_id)
if message_ids_to_clear_goodbye:
await observation_info.clear_processed_messages(message_ids_to_clear_goodbye)
self.should_continue = False # 正常结束
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
else:
logger.warning(f"[私聊][{self.private_name}]发送告别语失败。")
action_successful = False
self.should_continue = True # 发送失败不能结束
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "发送告别语失败"}
)
self.conversation_info.last_successful_reply_action = None
if not generated_content or generated_content.startswith("抱歉"):
logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。")
final_reason = "生成内容无效"
if action == "say_goodbye": final_status = "done"; self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。")
else: final_status = "recall"; conversation_info.last_successful_reply_action = None
else:
logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。")
action_successful = False
self.should_continue = True
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "未能生成告别语内容"}
)
self.conversation_info.last_successful_reply_action = None
# --- 发送回复逻辑 ---
self.generated_reply = generated_content
timestamp_before_sending = time.time()
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}")
self.state = ConversationState.SENDING
send_success = await self._send_reply()
send_end_time = time.time()
except Exception as goodbye_err:
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
action_successful = False
self.should_continue = True
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
)
self.conversation_info.last_successful_reply_action = None
if send_success:
action_successful = True # <--- 标记动作成功
# final_status 和 final_reason 在 finally 块中根据 action_successful 设置
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.")
if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time)
elif action == "end_conversation":
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
action_successful = True
self.conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# --- 清理已处理消息 ---
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', [])
message_ids_to_clear: Set[str] = set()
for msg in current_unprocessed_messages:
msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id")
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id)
if message_ids_to_clear: logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"); await observation_info.clear_processed_messages(message_ids_to_clear)
else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。")
# --- 决定下一轮规划类型 ---
other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0)
if other_new_msg_count_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。")
conversation_info.last_successful_reply_action = None
else:
logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。")
conversation_info.last_successful_reply_action = action
elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}]不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(
f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
)
self.state = ConversationState.IGNORED
action_successful = True
self.conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# 清除上次拒绝信息
conversation_info.last_reply_rejection_reason = None; conversation_info.last_rejected_reply_content = None
if action == "say_goodbye": self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。")
else:
# 发送失败
logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。")
final_status = "recall"; final_reason = "发送回复时失败" # 发送失败直接设置状态
conversation_info.last_successful_reply_action = None
if action == "say_goodbye": self.should_continue = True
# --- 其他动作处理 ---
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
if not self.goal_analyzer: raise RuntimeError("GoalAnalyzer 未初始化")
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
action_successful = True # <--- 标记动作成功
elif action == "listening":
self.state = ConversationState.LISTENING
if not self.waiter: raise RuntimeError("Waiter 未初始化")
logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...")
await self.waiter.wait_listening(conversation_info)
action_successful = True # <--- 标记动作成功
elif action == "end_conversation":
logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...")
action_successful = True # <--- 标记动作成功
self.should_continue = False
elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}")
self.state = ConversationState.IGNORED
action_successful = True # <--- 标记动作成功
elif action == "wait":
self.state = ConversationState.WAITING
if not self.waiter: raise RuntimeError("Waiter 未初始化")
logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...")
timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # <--- 标记动作成功
# wait 的 reason 在 finally 中设置
logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。")
else:
logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}")
final_status = "recall"; final_reason = f"未知的动作类型: {action}" # 未知动作直接失败
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info(f"[私聊][{self.private_name}]等待更多信息...")
try:
if not hasattr(self, "waiter"):
raise AttributeError("Waiter not initialized")
_timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True
except Exception as wait_err:
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
)
self.conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# --- 重置非回复动作的追问状态 ---
if action not in ["direct_reply", "send_new_message", "say_goodbye"]:
conversation_info.last_successful_reply_action = None
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
except asyncio.CancelledError:
logger.warning(f"[私聊][{self.private_name}] 处理动作 '{action}' 时被取消。")
final_status = "cancelled"; final_reason = "动作处理被取消"
conversation_info.last_successful_reply_action = None
raise
except Exception as handle_err:
logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
final_status = "error"; final_reason = f"处理动作时出错: {handle_err}"
self.state = ConversationState.ERROR
conversation_info.last_successful_reply_action = None
finally:
# --- 重置临时计数值 ---
conversation_info.other_new_messages_during_planning_count = 0
# --- 更新 Action History 状态 (优化) ---
# 如果状态仍然是默认的 recall但 action_successful 为 True则更新为 done
if final_status == "recall" and action_successful:
final_status = "done"
# 设置成功的 reason (可以根据动作类型细化)
if action == "wait":
# 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list
timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False
final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)")
elif action == "listening":
final_reason = "进入倾听状态"
elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]:
final_reason = f"成功执行 {action}" # 通用成功原因
else: # 默认为发送成功
final_reason = "成功发送"
# 更新历史记录
if conversation_info.done_action and action_index < len(conversation_info.done_action):
conversation_info.done_action[action_index].update(
{
"status": final_status,
"time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"final_reason": final_reason,
"duration_ms": int((time.time() - action_start_time) * 1000)
}
)
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}")
else:
logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。")
# --- 更新 Action History 状态 (保持不变) ---
if action_successful:
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'")
else:
logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败")
async def _send_reply(self) -> bool:
"""发送回复,并返回是否发送成功 (保持不变)"""
if not self.generated_reply:
logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。")
return False
"""发送生成的回复"""
if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。"); return False
if not self.direct_sender: logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。"); return False
if not self.chat_stream: logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。"); return False
try:
reply_content = self.generated_reply
if not hasattr(self, "direct_sender") or not self.direct_sender:
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
return False
if not self.chat_stream:
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。")
return False
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content, reply_to_message=None)
self.state = ConversationState.ANALYZING
return True
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
self.state = ConversationState.ANALYZING
logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}\n{traceback.format_exc()}")
self.state = ConversationState.ERROR
return False
# _send_timeout_message 方法可以保持不变
async def _send_timeout_message(self):
"""发送超时结束消息 (保持不变)"""
try:
messages = self.chat_observer.get_cached_messages(limit=1)
if not messages:
return
latest_message = self._convert_to_message(messages[0])
await self.direct_sender.send_message(
chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message
)
# 停止空闲对话检测器
if hasattr(self, 'idle_conversation_starter'):
self.idle_conversation_starter.stop()
"""发送超时结束消息"""
if hasattr(self, 'chat_observer'):
self.chat_observer.stop()
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")
if not self.direct_sender or not self.chat_stream: logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。"); return
try:
timeout_content = "我们好像很久没说话了,先这样吧~"
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None)
logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。")
await self.stop()
except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}")

View File

@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
# File: observation_info.py
from typing import List, Optional, Dict, Any, Set
from maim_message import UserInfo
import time
import traceback
from typing import List, Optional, Dict, Any, Set
from maim_message import UserInfo
from src.common.logger import get_module_logger
from src.plugins.utils.chat_message_builder import build_readable_messages
# 确保导入路径正确
from .chat_observer import ChatObserver
from .chat_states import NotificationHandler, NotificationType, Notification
from src.plugins.utils.chat_message_builder import build_readable_messages
import traceback # 导入 traceback 用于调试
logger = get_module_logger("observation_info")
@ -16,324 +17,282 @@ class ObservationInfoHandler(NotificationHandler):
"""ObservationInfo的通知处理器"""
def __init__(self, observation_info: "ObservationInfo", private_name: str):
"""初始化处理器
Args:
observation_info: 要更新的ObservationInfo实例
private_name: 私聊对象的名称用于日志记录
"""
"""初始化处理器"""
self.observation_info = observation_info
# 将 private_name 存储在 handler 实例中
self.private_name = private_name
async def handle_notification(self, notification: Notification): # 添加类型提示
# 获取通知类型和数据
async def handle_notification(self, notification: Notification):
"""处理来自 ChatObserver 的通知"""
notification_type = notification.type
data = notification.data
timestamp = notification.timestamp # 获取通知时间戳
try: # 添加错误处理块
try:
if notification_type == NotificationType.NEW_MESSAGE:
# 处理新消息通知
# logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 可以在需要时取消注释
message_id = data.get("message_id")
processed_plain_text = data.get("processed_plain_text")
detailed_plain_text = data.get("detailed_plain_text")
user_info_dict = data.get("user_info") # 先获取字典
time_value = data.get("time")
message_dict = data # data 本身就是消息字典
if not isinstance(message_dict, dict):
logger.warning(f"[私聊][{self.private_name}] 收到的 NEW_MESSAGE 数据不是字典: {data}")
return
# 确保 user_info 是字典类型再创建 UserInfo 对象
user_info = None
# 解析 UserInfo
user_info_dict = message_dict.get("user_info")
user_info: Optional[UserInfo] = None
if isinstance(user_info_dict, dict):
try:
user_info = UserInfo.from_dict(user_info_dict)
except Exception as e:
logger.error(
f"[私聊][{self.private_name}]从字典创建 UserInfo 时出错: {e}, 字典内容: {user_info_dict}"
)
# 可以选择在这里返回或记录错误,避免后续代码出错
return
logger.error(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}")
elif user_info_dict is not None:
logger.warning(
f"[私聊][{self.private_name}]收到的 user_info 不是预期的字典类型: {type(user_info_dict)}"
)
# 根据需要处理非字典情况,这里暂时返回
return
logger.warning(f"[私聊][{self.private_name}] 收到的 user_info 不是预期的字典类型: {type(user_info_dict)}")
message = {
"message_id": message_id,
"processed_plain_text": processed_plain_text,
"detailed_plain_text": detailed_plain_text,
"user_info": user_info_dict, # 存储原始字典或 UserInfo 对象,取决于你的 update_from_message 如何处理
"time": time_value,
}
# 传递 UserInfo 对象(如果成功创建)或原始字典
await self.observation_info.update_from_message(message, user_info) # 修改:传递 user_info 对象
# 更新 ObservationInfo
await self.observation_info.update_from_message(message_dict, user_info)
elif notification_type == NotificationType.COLD_CHAT:
# 处理冷场通知
is_cold = data.get("is_cold", False)
await self.observation_info.update_cold_chat_status(is_cold, time.time()) # 修改:改为 await 调用
elif notification_type == NotificationType.ACTIVE_CHAT:
# 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理)
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == NotificationType.BOT_SPEAKING:
# 处理机器人说话通知 (按需实现)
self.observation_info.is_typing = False
self.observation_info.last_bot_speak_time = time.time()
elif notification_type == NotificationType.USER_SPEAKING:
# 处理用户说话通知
self.observation_info.is_typing = False
self.observation_info.last_user_speak_time = time.time()
await self.observation_info.update_cold_chat_status(is_cold, timestamp) # 使用通知时间戳
elif notification_type == NotificationType.MESSAGE_DELETED:
# 处理消息删除通知
message_id = data.get("message_id")
# 从 unprocessed_messages 中移除被删除的消息
original_count = len(self.observation_info.unprocessed_messages)
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
]
# --- [修改点 11] 更新 new_messages_count ---
self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages)
if self.observation_info.new_messages_count < original_count:
logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}")
message_id_to_delete = data.get("message_id")
if message_id_to_delete:
await self.observation_info.remove_unprocessed_message(message_id_to_delete)
else:
logger.warning(f"[私聊][{self.private_name}] 收到无效的消息删除通知,缺少 message_id: {data}")
# --- 可以根据需要处理其他通知类型 ---
elif notification_type == NotificationType.ACTIVE_CHAT:
is_active = data.get("is_active", False)
# 通常由 COLD_CHAT 的反向状态处理,但也可以在这里显式处理
await self.observation_info.update_cold_chat_status(not is_active, timestamp)
elif notification_type == NotificationType.BOT_SPEAKING:
# 机器人开始说话 (例如,如果需要显示"正在输入...")
# self.observation_info.is_typing = True
pass # 暂时不处理
elif notification_type == NotificationType.USER_SPEAKING:
# 用户开始说话
# self.observation_info.is_typing = True
pass # 暂时不处理
elif notification_type == NotificationType.USER_JOINED:
# 处理用户加入通知 (如果适用私聊场景)
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(str(user_id)) # 确保是字符串
self.observation_info.active_users.add(str(user_id))
self.observation_info.update_changed()
elif notification_type == NotificationType.USER_LEFT:
# 处理用户离开通知 (如果适用私聊场景)
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(str(user_id)) # 确保是字符串
self.observation_info.active_users.discard(str(user_id))
self.observation_info.update_changed()
elif notification_type == NotificationType.ERROR:
# 处理错误通知
error_msg = data.get("error", "未提供错误信息")
logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}")
logger.error(f"[私聊][{self.private_name}] 收到错误通知: {error_msg}")
# 可以在这里触发一些错误处理逻辑
except Exception as e:
logger.error(f"[私聊][{self.private_name}]处理通知时发生错误: {e}")
logger.error(traceback.format_exc()) # 打印详细堆栈信息
logger.error(f"[私聊][{self.private_name}] 处理通知时发生错误 (类型: {notification_type.name}): {e}")
logger.error(traceback.format_exc())
# @dataclass <-- 这个,不需要了(递黄瓜)
class ObservationInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息 (手动实现 __init__)"""
# 类型提示保留,可用于文档和静态分析
private_name: str
chat_history: List[Dict[str, Any]]
chat_history_str: str
unprocessed_messages: List[Dict[str, Any]]
active_users: Set[str]
last_bot_speak_time: Optional[float]
last_user_speak_time: Optional[float]
last_message_time: Optional[float]
last_message_id: Optional[str]
last_message_content: str
last_message_sender: Optional[str]
bot_id: Optional[str]
chat_history_count: int
new_messages_count: int
cold_chat_start_time: Optional[float]
cold_chat_duration: float
is_typing: bool
is_cold_chat: bool
changed: bool
chat_observer: Optional[ChatObserver]
handler: Optional[ObservationInfoHandler]
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
def __init__(self, private_name: str):
"""
手动初始化 ObservationInfo 的所有实例变量
"""
# 接收的参数
"""初始化 ObservationInfo"""
self.private_name: str = private_name
# data_list
self.chat_history: List[Dict[str, Any]] = []
self.chat_history_str: str = ""
self.unprocessed_messages: List[Dict[str, Any]] = []
self.active_users: Set[str] = set()
# 聊天记录相关
self.chat_history: List[Dict[str, Any]] = [] # 存储已处理的消息历史
self.chat_history_str: str = "还没有聊天记录。" # 用于生成 Prompt 的历史记录字符串
self.chat_history_count: int = 0
# data
# 未处理消息相关 (核心修改点)
self.unprocessed_messages: List[Dict[str, Any]] = [] # 存储尚未被机器人回复的消息
self.new_messages_count: int = 0 # unprocessed_messages 的数量
# 状态信息
self.active_users: Set[str] = set() # 当前活跃用户 (私聊场景可能只有对方)
self.last_bot_speak_time: Optional[float] = None
self.last_user_speak_time: Optional[float] = None
self.last_message_time: Optional[float] = None
self.last_user_speak_time: Optional[float] = None # 指对方用户的发言时间
self.last_message_time: Optional[float] = None # 指所有消息(包括自己)的最新时间
self.last_message_id: Optional[str] = None
self.last_message_content: str = ""
self.last_message_sender: Optional[str] = None
self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id例如从 global_config 获取
self.chat_history_count: int = 0
self.new_messages_count: int = 0
self.last_message_sender: Optional[str] = None # user_id of the last message sender
self.bot_id: Optional[str] = None # 机器人自己的 ID
# 冷场状态
self.cold_chat_start_time: Optional[float] = None
self.cold_chat_duration: float = 0.0
self.is_cold_chat: bool = False # 当前是否处于冷场状态
# state
self.is_typing: bool = False
self.is_cold_chat: bool = False
self.changed: bool = False
# 其他状态
self.is_typing: bool = False # 是否正在输入 (未来可能用到)
self.changed: bool = False # 状态是否有变化 (用于优化)
# 关联对象
self.chat_observer: Optional[ChatObserver] = None
self.handler: Optional[ObservationInfoHandler] = ObservationInfoHandler(self, self.private_name)
self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name)
# 初始化 bot_id
try:
from ...config.config import global_config
self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
if not self.bot_id:
logger.error(f"[私聊][{self.private_name}] 未能从配置中获取 BOT_QQ ID")
except ImportError:
logger.error(f"[私聊][{self.private_name}] 无法导入 global_config 获取 BOT_QQ ID")
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 获取 BOT_QQ ID 时出错: {e}")
# --- 初始化 bot_id ---
from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题
self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
def bind_to_chat_observer(self, chat_observer: ChatObserver):
"""绑定到指定的chat_observer (保持不变)"""
"""绑定到指定的 ChatObserver 并注册通知处理器"""
if self.chat_observer:
logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver")
logger.warning(f"[私聊][{self.private_name}] 尝试重复绑定 ChatObserver")
return
if not self.handler:
logger.error(f"[私聊][{self.private_name}] ObservationInfoHandler 未初始化,无法绑定!")
return
self.chat_observer = chat_observer
try:
if not self.handler:
logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!")
self.chat_observer = None
return
# 注册需要处理的通知类型
notification_manager = self.chat_observer.notification_manager
notification_manager.register_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler)
notification_manager.register_handler("observation_info", NotificationType.COLD_CHAT, self.handler)
notification_manager.register_handler("observation_info", NotificationType.MESSAGE_DELETED, self.handler)
# 根据需要注册更多类型...
# notification_manager.register_handler("observation_info", NotificationType.ACTIVE_CHAT, self.handler)
# notification_manager.register_handler("observation_info", NotificationType.USER_JOINED, self.handler)
# notification_manager.register_handler("observation_info", NotificationType.USER_LEFT, self.handler)
# notification_manager.register_handler("observation_info", NotificationType.ERROR, self.handler)
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
# --- [修改点 12] 注册 MESSAGE_DELETED ---
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
)
logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver")
logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功绑定到 ChatObserver")
except AttributeError:
logger.error(f"[私聊][{self.private_name}] 绑定的 ChatObserver 对象缺少 notification_manager 属性!")
self.chat_observer = None # 绑定失败
except Exception as e:
logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}")
self.chat_observer = None
logger.error(f"[私聊][{self.private_name}] 绑定到 ChatObserver 时出错: {e}")
self.chat_observer = None # 绑定失败
def unbind_from_chat_observer(self):
"""解除与chat_observer的绑定 (保持不变)"""
if (
self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler
):
"""解除与 ChatObserver 的绑定"""
if self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler:
try:
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
# --- [修改点 13] 注销 MESSAGE_DELETED ---
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
)
logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑")
notification_manager = self.chat_observer.notification_manager
notification_manager.unregister_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler)
notification_manager.unregister_handler("observation_info", NotificationType.COLD_CHAT, self.handler)
notification_manager.unregister_handler("observation_info", NotificationType.MESSAGE_DELETED, self.handler)
# ... 注销其他已注册的类型 ...
logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功从 ChatObserver 解绑")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}")
logger.error(f"[私聊][{self.private_name}] 从 ChatObserver 解绑时出错: {e}")
finally:
self.chat_observer = None
self.chat_observer = None # 无论成功与否都清除引用
else:
logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置")
logger.warning(f"[私聊][{self.private_name}] 尝试解绑时 ChatObserver 无效或 handler 未设置")
async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]):
"""从消息更新信息 (保持不变)"""
"""根据收到的新消息更新 ObservationInfo 的状态"""
message_time = message.get("time")
message_id = message.get("message_id")
processed_text = message.get("processed_plain_text", "")
sender_id_str: Optional[str] = str(user_info.user_id) if user_info else None
if message_time and message_time > (self.last_message_time or 0):
if not message_time or not message_id:
logger.warning(f"[私聊][{self.private_name}] 收到的消息缺少 time 或 message_id: {message}")
return
# 更新最后消息时间(所有消息)
if message_time > (self.last_message_time or 0):
self.last_message_time = message_time
self.last_message_id = message_id
self.last_message_content = processed_text
self.is_cold_chat = False
self.cold_chat_start_time = None
self.cold_chat_duration = 0.0
self.last_message_sender = sender_id_str
if user_info:
sender_id = str(user_info.user_id)
self.last_message_sender = sender_id
if sender_id == self.bot_id:
self.last_bot_speak_time = message_time
else:
self.last_user_speak_time = message_time
self.active_users.add(sender_id)
# 更新说话者特定时间
if sender_id_str:
if sender_id_str == self.bot_id:
self.last_bot_speak_time = message_time
else:
logger.warning(
f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}"
)
self.last_message_sender = None
# --- [修改点 14] 添加到未处理列表,并更新计数 ---
# 检查消息是否已存在于未处理列表中,避免重复添加
if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages):
self.unprocessed_messages.append(message)
self.new_messages_count = len(self.unprocessed_messages)
logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}")
self.update_changed()
else:
logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}")
self.last_user_speak_time = message_time
self.active_users.add(sender_id_str) # 添加到活跃用户
else:
pass
logger.warning(f"[私聊][{self.private_name}] 处理消息更新时缺少有效的 UserInfo, message_id: {message_id}")
# 更新冷场状态
self.is_cold_chat = False
self.cold_chat_start_time = None
self.cold_chat_duration = 0.0
# --- [核心修改] 将新消息添加到未处理列表 ---
# 检查消息是否已存在于未处理列表中,避免重复添加
if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages):
# 创建消息的副本以避免修改原始数据(如果需要)
self.unprocessed_messages.append(message.copy())
self.new_messages_count = len(self.unprocessed_messages)
logger.debug(f"[私聊][{self.private_name}] 添加新未处理消息 ID: {message_id}, 发送者: {sender_id_str}, 当前未处理数: {self.new_messages_count}")
self.update_changed()
else:
logger.warning(f"[私聊][{self.private_name}] 尝试重复添加未处理消息 ID: {message_id}")
def update_changed(self):
"""标记状态已改变,并重置标记 (保持不变)"""
self.changed = True
async def remove_unprocessed_message(self, message_id_to_delete: str):
"""从 unprocessed_messages 列表中移除指定 ID 的消息"""
original_count = len(self.unprocessed_messages)
self.unprocessed_messages = [
msg for msg in self.unprocessed_messages if msg.get("message_id") != message_id_to_delete
]
new_count = len(self.unprocessed_messages)
if new_count < original_count:
self.new_messages_count = new_count
logger.info(f"[私聊][{self.private_name}] 移除了未处理的消息 (ID: {message_id_to_delete}), 当前未处理数: {self.new_messages_count}")
self.update_changed()
else:
logger.warning(f"[私聊][{self.private_name}] 尝试移除不存在的未处理消息 ID: {message_id_to_delete}")
async def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态 (保持不变)"""
"""更新冷场状态"""
if is_cold != self.is_cold_chat:
self.is_cold_chat = is_cold
if is_cold:
self.cold_chat_start_time = (
self.last_message_time or current_time
)
logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}")
# 冷场开始时间应基于最后一条消息的时间
self.cold_chat_start_time = self.last_message_time or current_time
logger.info(f"[私聊][{self.private_name}] 进入冷场状态,开始时间: {self.cold_chat_start_time:.2f}")
else:
if self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f}")
self.cold_chat_start_time = None
logger.info(f"[私聊][{self.private_name}] 结束冷场状态,持续时间: {self.cold_chat_duration:.2f}")
self.cold_chat_start_time = None # 结束冷场,重置开始时间
self.update_changed()
# 持续更新冷场时长
if self.is_cold_chat and self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
def get_active_duration(self) -> float:
"""获取当前活跃时长 (保持不变)"""
if not self.last_message_time:
return 0.0
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户最后响应时间 (保持不变)"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def update_changed(self):
"""标记状态已改变"""
self.changed = True
# 这个标记通常在处理完改变后由外部逻辑重置为 False
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人最后响应时间 (保持不变)"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
# --- [修改点 15] 重命名并修改 clear_unprocessed_messages ---
# async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除
async def clear_processed_messages(self, message_ids_to_clear: Set[str]):
"""将指定ID的未处理消息移入历史记录并更新相关状态"""
"""将指定 ID 的未处理消息移入历史记录,并更新相关状态"""
if not message_ids_to_clear:
logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。")
logger.debug(f"[私聊][{self.private_name}] 没有需要清理的消息 ID。")
return
messages_to_move = []
@ -342,36 +301,40 @@ class ObservationInfo:
# 分离要清理和要保留的消息
for msg in self.unprocessed_messages:
if msg.get("message_id") in message_ids_to_clear:
msg_id = msg.get("message_id")
if msg_id in message_ids_to_clear:
messages_to_move.append(msg)
cleared_count += 1
else:
remaining_messages.append(msg)
if not messages_to_move:
logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。")
logger.debug(f"[私聊][{self.private_name}] 未找到与 ID 列表 {message_ids_to_clear} 匹配的未处理消息进行清理。")
return
logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...")
logger.debug(f"[私聊][{self.private_name}] 准备清理 {cleared_count} 条已处理消息...")
# 将要移动的消息添加到历史记录
max_history_len = 100
# 将要移动的消息添加到历史记录 (按时间排序)
messages_to_move.sort(key=lambda m: m.get("time", 0))
self.chat_history.extend(messages_to_move)
# 限制历史记录长度 (可选)
max_history_len = 100 # 例如保留最近 100 条
if len(self.chat_history) > max_history_len:
self.chat_history = self.chat_history[-max_history_len:]
# 更新历史记录字符串 (仅使用最近一部分生成)
history_slice_for_str = self.chat_history[-20:] # 例如最近20条
# 更新历史记录字符串 (仅使用最近一部分生成,提高效率)
history_slice_for_str = self.chat_history[-20:] # 例如最近 20
try:
self.chat_history_str = await build_readable_messages(
history_slice_for_str,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
read_mark=0.0, # read_mark 可能需要调整或移除
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}")
logger.error(f"[私聊][{self.private_name}] 构建聊天记录字符串时出错: {e}")
self.chat_history_str = "[构建聊天记录出错]"
# 更新未处理消息列表和计数
@ -379,6 +342,27 @@ class ObservationInfo:
self.new_messages_count = len(self.unprocessed_messages)
self.chat_history_count = len(self.chat_history)
logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。")
logger.info(f"[私聊][{self.private_name}] 已清理 {cleared_count} 条消息 (IDs: {message_ids_to_clear}),剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。")
self.update_changed() # 状态改变
# --- Helper methods (可以根据需要添加) ---
def get_active_duration(self) -> float:
"""获取当前活跃时长(距离最后一条消息的时间)"""
if not self.last_message_time:
return float('inf') # 或返回 0.0,取决于定义
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取对方最后响应时间(距离对方最后一条消息的时间)"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人最后响应时间(距离机器人最后一条消息的时间)"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
self.update_changed() # 状态改变