MaiBot/src/plugins/PFC/conversation.py

852 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import asyncio
import datetime
# from .message_storage import MongoDBMessageStorage
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from ...config.config import global_config # 确保导入 global_config
from typing import Dict, Any, Optional
from ..chat.message import Message
from .pfc_types import ConversationState
# 确保导入 ChatObserver 和 GoalAnalyzer (如果 pfc.py 中定义了它们)
# from .pfc import ChatObserver, GoalAnalyzer # 可能需要调整导入路径
from .chat_observer import ChatObserver # 导入 ChatObserver
from .pfc import GoalAnalyzer # 导入 GoalAnalyzer
from .message_sender import DirectMessageSender
from src.common.logger_manager import get_logger
from .action_planner import ActionPlanner
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo # 确保导入 ConversationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from maim_message import UserInfo
from src.plugins.chat.chat_stream import chat_manager
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .waiter import Waiter
import traceback
from rich.traceback import install
install(extra_lines=3)
logger = get_logger("pfc")
class Conversation:
"""对话类,负责管理单个对话的状态和行为"""
def __init__(self, stream_id: str, private_name: str):
"""初始化对话实例
Args:
stream_id: 聊天流ID
"""
self.stream_id = stream_id
self.private_name = private_name
self.state = ConversationState.INIT
self.should_continue = False
self.ignore_until_timestamp: Optional[float] = None
# 回复相关
self.generated_reply = ""
# 初始化 bot_id
self.bot_id = str(global_config.BOT_QQ) # 从配置中获取
async def _initialize(self):
"""初始化实例,注册所有组件"""
try:
self.action_planner = ActionPlanner(self.stream_id, self.private_name)
self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name)
self.reply_generator = ReplyGenerator(self.stream_id, self.private_name)
self.knowledge_fetcher = KnowledgeFetcher(self.private_name)
self.waiter = Waiter(self.stream_id, self.private_name)
self.direct_sender = DirectMessageSender(self.private_name)
# 获取聊天流信息
self.chat_stream = chat_manager.get_stream(self.stream_id)
self.stop_action_planner = False
except Exception as e:
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册运行组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
raise
try:
# 决策所需要的信息,包括自身自信和观察信息两部分
# 注册观察器和观测信息
self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name)
self.chat_observer.start()
self.observation_info = ObservationInfo(self.private_name)
# --- 在绑定前设置 bot_id ---
self.observation_info.bot_id = self.bot_id
# --- 设置结束 ---
self.observation_info.bind_to_chat_observer(self.chat_observer)
# print(self.chat_observer.get_cached_messages(limit=)
self.conversation_info = ConversationInfo()
except Exception as e:
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
raise
try:
logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...")
initial_messages = get_raw_msg_before_timestamp_with_chat( #
chat_id=self.stream_id,
timestamp=time.time(),
limit=30, # 加载最近30条作为初始上下文可以调整
)
chat_talking_prompt = await build_readable_messages(
initial_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
if initial_messages:
# 将加载的消息填充到 ObservationInfo 的 chat_history
self.observation_info.chat_history = initial_messages
self.observation_info.chat_history_str = chat_talking_prompt + "\n"
self.observation_info.chat_history_count = len(initial_messages)
# 更新 ObservationInfo 中的时间戳等信息
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get("time")
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
self.observation_info.last_message_sender = str(last_user_info.user_id) # 确保是字符串
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
logger.info(
f"[私聊][{self.private_name}]成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
)
# 让 ChatObserver 从加载的最后一条消息之后开始同步
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
else:
logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。")
except Exception as load_err:
logger.error(f"[私聊][{self.private_name}]加载初始聊天记录时出错: {load_err}")
# 出错也要继续,只是没有历史记录而已
# 组件准备完成,启动该论对话
self.should_continue = True
asyncio.create_task(self.start())
async def start(self):
"""开始对话流程"""
try:
logger.info(f"[私聊][{self.private_name}]对话系统启动中...")
asyncio.create_task(self._plan_and_action_loop())
except Exception as e:
logger.error(f"[私聊][{self.private_name}]启动对话系统失败: {e}")
raise
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
while self.should_continue:
# 忽略逻辑
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
await asyncio.sleep(30)
continue
elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp:
logger.info(f"[私聊][{self.private_name}]忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None
self.should_continue = False
continue
try:
# --- 修改:记录规划开始时的时间戳和未处理消息数 ---
planning_marker_time = self.observation_info.last_message_time or time.time()
initial_unprocessed_count = self.observation_info.new_messages_count
# logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}")
# --- 调用 Action Planner ---
# 传递 self.conversation_info.last_successful_reply_action
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
)
# --- 修改:规划后检查是否有 *显著增多* 的新消息到达 ---
current_unprocessed_count = self.observation_info.new_messages_count
# 增加一个缓冲值例如允许规划期间到达1-2条新消息不打断
planning_buffer = 2
if current_unprocessed_count > initial_unprocessed_count + planning_buffer:
logger.info(
f"[私聊][{self.private_name}]规划期间收到较多新消息 ({initial_unprocessed_count} -> {current_unprocessed_count}),超过缓冲 {planning_buffer},跳过本次行动,立即重新规划"
)
# 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了
self.conversation_info.last_successful_reply_action = None
await asyncio.sleep(0.1) # 短暂等待避免CPU空转
continue # 重新进入循环进行规划
# --- 修改:移除旧的清理逻辑 ---
# 旧逻辑: 在执行动作前清理所有已知的新消息
# if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]:
# # ... (旧的清理代码) ...
# 新逻辑: 不在这里清理,交给 _handle_action 成功发送后处理
# --- 传递 planning_marker_time 给 _handle_action ---
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time)
# 检查是否需要结束对话 (逻辑不变)
goal_ended = False
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
for goal_item in self.conversation_info.goal_list:
current_goal = None # 初始化
if isinstance(goal_item, dict):
current_goal = goal_item.get("goal")
elif isinstance(goal_item, str): # 处理直接是字符串的情况
current_goal = goal_item
if current_goal == "结束对话":
goal_ended = True
break
if goal_ended:
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]检测到'结束对话'目标,停止循环。")
except Exception as loop_err:
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
await asyncio.sleep(1)
if self.should_continue:
await asyncio.sleep(0.1) # 保持循环间的短暂间隔
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
# --- 修改_check_new_messages_after_planning 调整为接收标记时间 ---
def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool:
"""检查在规划开始后是否有新消息到达(考虑缓冲)"""
if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "last_message_time"):
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'last_message_time' 属性,无法检查新消息。"
)
return False
# 检查最后一条消息的时间是否晚于规划开始的时间点
if self.observation_info.last_message_time and self.observation_info.last_message_time > planning_marker_time:
# 这里可以进一步检查新消息的数量是否超过 buffer但目前仅检查是否有新消息即可打断
# current_unprocessed_count = self.observation_info.new_messages_count
# initial_count_at_planning # 这个需要传递进来或重新设计获取方式
# if current_unprocessed_count > initial_count_at_planning + buffer:
logger.info(
f"[私聊][{self.private_name}]动作执行期间(生成/检查/发送前)检测到新消息 (晚于 {planning_marker_time}),取消当前动作并重新规划"
)
# 重置上次成功回复状态
if hasattr(self, "conversation_info"):
self.conversation_info.last_successful_reply_action = None
else:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。"
)
return True
return False
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
# 尝试从 msg_dict 直接获取 chat_stream如果失败则从全局 chat_manager 获取
chat_info = msg_dict.get("chat_info")
if chat_info and isinstance(chat_info, dict):
chat_stream = ChatStream.from_dict(chat_info)
elif self.chat_stream: # 使用实例变量中的 chat_stream
chat_stream = self.chat_stream
else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id)
chat_stream = chat_manager.get_stream(self.stream_id)
if not chat_stream:
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID
chat_stream=chat_stream, # 使用确定的 chat_stream
time=msg_dict.get("time", time.time()), # 提供默认时间
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
# 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
# --- 修改_handle_action 接收 planning_marker_time ---
async def _handle_action(
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float
):
"""处理规划的行动"""
logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}")
# 记录action历史 (逻辑不变)
current_action_record = {
"action": action,
"plan_reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
# 确保 done_action 列表存在
if not hasattr(conversation_info, "done_action"):
conversation_info.done_action = []
conversation_info.done_action.append(current_action_record)
action_index = len(conversation_info.done_action) - 1
action_successful = False # 用于标记动作是否成功完成
# --- 根据不同的 action 执行 ---
# send_new_message 失败后执行 wait
if action == "send_new_message":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(
f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
)
self.state = ConversationState.GENERATING
# --- 修改:生成前检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]生成追问回复前检测到新消息,取消动作")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "生成追问前收到新消息,取消"}
)
return # 直接返回,主循环会重新规划
# 1. 生成回复 (调用 generate 时传入 action_type)
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="send_new_message"
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}"
)
# --- 修改:检查前再次检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]检查追问回复前检测到新消息,取消动作")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"检查追问前收到新消息,取消发送: {self.generated_reply}"}
)
return
# 2. 检查回复 (逻辑不变)
self.state = ConversationState.CHECKING
try:
current_goal_str = "" # 初始化
if conversation_info.goal_list:
first_goal = conversation_info.goal_list[0]
if isinstance(first_goal, dict):
current_goal_str = first_goal.get("goal", "")
elif isinstance(first_goal, str):
current_goal_str = first_goal
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
# --- 修改:记录失败原因和内容到 conversation_info ---
if not is_suitable:
conversation_info.last_reply_rejection_reason = check_reason
conversation_info.last_rejected_reply_content = self.generated_reply
else:
# 清除上次失败记录
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# --- 记录结束 ---
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}"
)
break # 跳出循环,后续会处理 need_replan
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
# 记录失败
conversation_info.last_reply_rejection_reason = check_reason
conversation_info.last_rejected_reply_content = self.generated_reply
break # 出错也跳出循环
# 循环结束,处理最终结果
if is_suitable:
# --- 修改:发送前最终检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]发送追问回复前检测到新消息,取消发送")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送追问前收到新消息,取消发送: {final_reply_to_send}"}
)
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send
send_success = await self._send_reply() # 调用发送函数,获取发送结果
if send_success:
# --- 修改:发送成功后,标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# 更新状态: 标记上次成功是 send_new_message
self.conversation_info.last_successful_reply_action = "send_new_message"
action_successful = True # 标记动作成功
else:
# 发送失败处理
logger.error(f"[私聊][{self.private_name}]发送追问回复失败")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"}
)
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
)
# 打回时不清空失败记录,让 planner 看到
self.conversation_info.last_successful_reply_action = None # 重置成功状态
else:
# 追问失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 追问失败,下次用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 失败时不清空失败记录
# 执行 Wait 操作
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
# --- Wait 操作也需要标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
action_successful = True # Wait 本身算成功完成
elif action == "direct_reply":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(
f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
)
self.state = ConversationState.GENERATING
# --- 修改:生成前检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]生成首次回复前检测到新消息,取消动作")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "生成首次回复前收到新消息,取消"}
)
return
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="direct_reply"
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}"
)
# --- 修改:检查前再次检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]检查首次回复前检测到新消息,取消动作")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"检查首次回复前收到新消息,取消发送: {self.generated_reply}"}
)
return
# 2. 检查回复
self.state = ConversationState.CHECKING
try:
current_goal_str = "" # 初始化
if conversation_info.goal_list:
first_goal = conversation_info.goal_list[0]
if isinstance(first_goal, dict):
current_goal_str = first_goal.get("goal", "")
elif isinstance(first_goal, str):
current_goal_str = first_goal
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
# --- 修改:记录失败原因和内容到 conversation_info ---
if not is_suitable:
conversation_info.last_reply_rejection_reason = check_reason
conversation_info.last_rejected_reply_content = self.generated_reply
else:
# 清除上次失败记录
conversation_info.last_reply_rejection_reason = None
conversation_info.last_rejected_reply_content = None
# --- 记录结束 ---
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}"
)
break # 跳出循环
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
# 记录失败
conversation_info.last_reply_rejection_reason = check_reason
conversation_info.last_rejected_reply_content = self.generated_reply
break # 出错也跳出循环
# 循环结束,处理最终结果
if is_suitable:
# --- 修改:发送前最终检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]发送首次回复前检测到新消息,取消发送")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送首次回复前收到新消息,取消发送: {final_reply_to_send}"}
)
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send
send_success = await self._send_reply() # 调用发送函数
if send_success:
# --- 修改:发送成功后,标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# 更新状态: 标记上次成功是 direct_reply
self.conversation_info.last_successful_reply_action = "direct_reply"
action_successful = True # 标记动作成功
else:
# 发送失败处理
logger.error(f"[私聊][{self.private_name}]发送首次回复失败")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"}
)
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
)
# 打回时不清空失败记录
self.conversation_info.last_successful_reply_action = None # 重置成功状态
else:
# 首次回复失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 首次回复失败,下次还是用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 失败时不清空失败记录
# 执行 Wait 操作 (保持原有逻辑)
logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
# --- Wait 操作也需要标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
action_successful = True # Wait 本身算成功
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
try:
# 检查 goal_analyzer 是否存在
if not hasattr(self, "goal_analyzer"):
logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。")
raise AttributeError("GoalAnalyzer not initialized")
# --- rethink_goal 前检查新消息 ---
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]重新思考目标前检测到新消息,取消动作")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "重新思考目标前收到新消息,取消"}
)
return
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
# --- rethink_goal 后标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
except Exception as rethink_err:
logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info(f"[私聊][{self.private_name}]倾听对方发言...")
try:
# 检查 waiter 是否存在
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。")
raise AttributeError("Waiter not initialized")
# --- listening 前检查新消息 ---
# 倾听时如果收到新消息,通常应该继续倾听或转为回复,而不是取消
# 所以这里不检查新消息打断
# --- listening 后标记处理过的消息 ---
# 倾听是等待行为,也需要标记之前的消息已处理
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait_listening(conversation_info)
action_successful = True # Listening 完成就算成功
except Exception as listen_err:
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "say_goodbye":
self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
try:
# --- 告别前检查新消息 ---
# 如果有新消息,可能不适合告别了
if self._check_new_messages_during_action(planning_marker_time):
logger.info(f"[私聊][{self.private_name}]发送告别语前检测到新消息,取消告别")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "发送告别语前收到新消息,取消"}
)
return
# 1. 生成告别语 (使用 'say_goodbye' action_type)
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="say_goodbye"
)
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
# 2. 直接发送告别语 (不经过检查)
if self.generated_reply: # 确保生成了内容
send_success = await self._send_reply() # 调用发送方法
if send_success:
# --- 发送成功后标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
logger.info(f"[私聊][{self.private_name}]告别语已发送。")
else:
logger.warning(f"[私聊][{self.private_name}]发送告别语失败。")
action_successful = False
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "发送告别语失败"}
)
else:
logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。")
action_successful = False # 标记动作失败
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "未能生成告别语内容"}
)
# 3. 无论是否发送成功,都准备结束对话
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
except Exception as goodbye_err:
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
# 即使出错,也结束对话
self.should_continue = False
action_successful = False # 标记动作失败
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
)
elif action == "end_conversation":
# 这个分支现在只会在 action_planner 最终决定不告别时被调用
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
# --- 结束对话前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True # 标记这个指令本身是成功的
elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}]不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(
f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
)
self.state = ConversationState.IGNORED
# --- 屏蔽前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True # 标记动作成功
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info(f"[私聊][{self.private_name}]等待更多信息...")
try:
# 检查 waiter 是否存在
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。")
raise AttributeError("Waiter not initialized")
# --- Wait 前检查新消息 ---
# Wait 时如果收到新消息wait 逻辑内部会处理并退出,所以这里不打断
# --- Wait 开始前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
_timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # Wait 完成就算成功
except Exception as wait_err:
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
# --- 更新 Action History 状态 ---
# 只有当动作本身成功时,才更新状态为 done
if action_successful:
# 确保 action_index 在有效范围内
if action_index < len(conversation_info.done_action):
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 重置状态: 对于非回复类动作的成功,清除上次回复状态
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
# logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action")
else:
logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}")
# 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action
# --- 修改_send_reply 返回发送是否成功 ---
async def _send_reply(self) -> bool:
"""发送回复,并返回发送是否成功"""
if not self.generated_reply:
logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。")
return False # 发送失败
try:
_current_time = time.time()
reply_content = self.generated_reply
# 发送消息 (确保 direct_sender 和 chat_stream 有效)
if not hasattr(self, "direct_sender") or not self.direct_sender:
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
return False # 发送失败
if not self.chat_stream:
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。")
return False # 发送失败
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
# 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息
# 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持)
# 暂时注释掉,观察是否影响 ObservationInfo 的更新
# self.chat_observer.trigger_update()
# if not await self.chat_observer.wait_for_update():
# logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时")
self.state = ConversationState.ANALYZING # 更新状态
return True # 发送成功
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
self.state = ConversationState.ANALYZING
return False # 发送失败
async def _send_timeout_message(self):
"""发送超时结束消息"""
try:
# 尝试从 observation_info 获取历史记录
if not hasattr(self, 'observation_info') or not self.observation_info.chat_history:
logger.warning(f"[私聊][{self.private_name}]无法获取聊天历史,无法发送超时消息。")
return
messages = self.observation_info.chat_history[-1:] # 获取最后一条
if not messages:
return
latest_message_dict = messages[0]
# 确保 chat_stream 存在
if not self.chat_stream:
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送超时消息。")
return
# 将字典转换为 Message 对象
latest_message = self._convert_to_message(latest_message_dict)
await self.direct_sender.send_message(
chat_stream=self.chat_stream, content="[自动消息] 对方长时间未响应,对话已超时。", reply_to_message=latest_message
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")