mirror of https://github.com/Mai-with-u/MaiBot.git
parent
aff437db8d
commit
b30d0cbbe8
|
|
@ -84,7 +84,6 @@ class Conversation:
|
|||
self.observation_info.bot_id = self.bot_id
|
||||
# --- 设置结束 ---
|
||||
self.observation_info.bind_to_chat_observer(self.chat_observer)
|
||||
# print(self.chat_observer.get_cached_messages(limit=)
|
||||
|
||||
self.conversation_info = ConversationInfo()
|
||||
except Exception as e:
|
||||
|
|
@ -93,7 +92,7 @@ class Conversation:
|
|||
raise
|
||||
try:
|
||||
logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...")
|
||||
initial_messages = get_raw_msg_before_timestamp_with_chat( #
|
||||
initial_messages = get_raw_msg_before_timestamp_with_chat(
|
||||
chat_id=self.stream_id,
|
||||
timestamp=time.time(),
|
||||
limit=30, # 加载最近30条作为初始上下文,可以调整
|
||||
|
|
@ -114,8 +113,15 @@ class Conversation:
|
|||
# 更新 ObservationInfo 中的时间戳等信息
|
||||
last_msg = initial_messages[-1]
|
||||
self.observation_info.last_message_time = last_msg.get("time")
|
||||
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
|
||||
self.observation_info.last_message_sender = str(last_user_info.user_id) # 确保是字符串
|
||||
# 确保 last_msg['user_info'] 是字典
|
||||
last_user_info_dict = last_msg.get("user_info", {})
|
||||
if isinstance(last_user_info_dict, dict):
|
||||
last_user_info = UserInfo.from_dict(last_user_info_dict)
|
||||
self.observation_info.last_message_sender = str(last_user_info.user_id) # 确保是字符串
|
||||
else:
|
||||
logger.warning(f"Initial message user_info is not a dict: {last_user_info_dict}")
|
||||
self.observation_info.last_message_sender = None
|
||||
|
||||
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
|
||||
|
||||
logger.info(
|
||||
|
|
@ -157,10 +163,11 @@ class Conversation:
|
|||
self.should_continue = False
|
||||
continue
|
||||
try:
|
||||
# --- 修改:记录规划开始时的时间戳和未处理消息数 ---
|
||||
planning_marker_time = self.observation_info.last_message_time or time.time()
|
||||
# --- 记录规划开始时的时间戳和未处理消息数 ---
|
||||
# 使用 time.time() 获取当前时间戳作为标记点,更准确反映规划开始的时刻
|
||||
planning_marker_time = time.time()
|
||||
initial_unprocessed_count = self.observation_info.new_messages_count
|
||||
# logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}")
|
||||
logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}")
|
||||
|
||||
|
||||
# --- 调用 Action Planner ---
|
||||
|
|
@ -169,26 +176,27 @@ class Conversation:
|
|||
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
|
||||
)
|
||||
|
||||
# --- 修改:规划后检查是否有 *显著增多* 的新消息到达 ---
|
||||
# --- 规划后检查是否有 *过多* 新消息到达 ---
|
||||
# 检查规划期间(调用 plan 函数的时间段内)新收到的消息数
|
||||
current_unprocessed_count = self.observation_info.new_messages_count
|
||||
# 增加一个缓冲值,例如允许规划期间到达1-2条新消息不打断
|
||||
planning_buffer = 2
|
||||
if current_unprocessed_count > initial_unprocessed_count + planning_buffer:
|
||||
# planning_buffer = 2 # 用户指定的缓冲值
|
||||
planning_buffer = 2 # 使用用户指定的缓冲值
|
||||
new_messages_during_planning = current_unprocessed_count - initial_unprocessed_count
|
||||
|
||||
if new_messages_during_planning > planning_buffer:
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]规划期间收到较多新消息 ({initial_unprocessed_count} -> {current_unprocessed_count}),超过缓冲 {planning_buffer},跳过本次行动,立即重新规划"
|
||||
f"[私聊][{self.private_name}]规划期间收到 {new_messages_during_planning} 条新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划"
|
||||
)
|
||||
# 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了
|
||||
# 重置上次成功回复状态,因为要响应新消息
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
await asyncio.sleep(0.1) # 短暂等待,避免CPU空转
|
||||
# 记录被放弃的规划动作 (可选)
|
||||
# current_action_record = { ... status: "recalled_before_execution" ... }
|
||||
# conversation_info.done_action.append(current_action_record)
|
||||
await asyncio.sleep(0.1) # 短暂等待
|
||||
continue # 重新进入循环进行规划
|
||||
|
||||
# --- 修改:移除旧的清理逻辑 ---
|
||||
# 旧逻辑: 在执行动作前清理所有已知的新消息
|
||||
# if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]:
|
||||
# # ... (旧的清理代码) ...
|
||||
# 新逻辑: 不在这里清理,交给 _handle_action 成功发送后处理
|
||||
|
||||
# --- 传递 planning_marker_time 给 _handle_action ---
|
||||
# --- 如果规划期间新消息未超限,则继续执行规划的动作 ---
|
||||
# 将 planning_marker_time 传递给 _handle_action
|
||||
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time)
|
||||
|
||||
# 检查是否需要结束对话 (逻辑不变)
|
||||
|
|
@ -212,40 +220,19 @@ class Conversation:
|
|||
except Exception as loop_err:
|
||||
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
|
||||
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
|
||||
await asyncio.sleep(1)
|
||||
await asyncio.sleep(1) # 发生错误时等待一段时间
|
||||
|
||||
if self.should_continue:
|
||||
await asyncio.sleep(0.1) # 保持循环间的短暂间隔
|
||||
|
||||
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
|
||||
|
||||
# --- 修改:_check_new_messages_after_planning 调整为接收标记时间 ---
|
||||
def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool:
|
||||
"""检查在规划开始后是否有新消息到达(考虑缓冲)"""
|
||||
if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "last_message_time"):
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'last_message_time' 属性,无法检查新消息。"
|
||||
)
|
||||
return False
|
||||
|
||||
# 检查最后一条消息的时间是否晚于规划开始的时间点
|
||||
if self.observation_info.last_message_time and self.observation_info.last_message_time > planning_marker_time:
|
||||
# 这里可以进一步检查新消息的数量是否超过 buffer,但目前仅检查是否有新消息即可打断
|
||||
# current_unprocessed_count = self.observation_info.new_messages_count
|
||||
# initial_count_at_planning # 这个需要传递进来或重新设计获取方式
|
||||
# if current_unprocessed_count > initial_count_at_planning + buffer:
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]动作执行期间(生成/检查/发送前)检测到新消息 (晚于 {planning_marker_time}),取消当前动作并重新规划"
|
||||
)
|
||||
# 重置上次成功回复状态
|
||||
if hasattr(self, "conversation_info"):
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。"
|
||||
)
|
||||
return True
|
||||
return False
|
||||
# --- 移除 _check_new_messages_during_action 方法 ---
|
||||
# 这个方法不再需要,因为检查逻辑已合并到 _plan_and_action_loop 中,
|
||||
# 并且 _handle_action 内部不再需要打断执行。
|
||||
# def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool:
|
||||
# # ... (旧代码) ...
|
||||
# pass
|
||||
|
||||
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
|
||||
"""将消息字典转换为Message对象"""
|
||||
|
|
@ -261,7 +248,14 @@ class Conversation:
|
|||
if not chat_stream:
|
||||
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
|
||||
|
||||
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
|
||||
user_info_dict = msg_dict.get("user_info", {})
|
||||
if isinstance(user_info_dict, dict):
|
||||
user_info = UserInfo.from_dict(user_info_dict)
|
||||
else:
|
||||
logger.warning(f"Message user_info is not a dict: {user_info_dict}")
|
||||
# 根据需要返回默认 UserInfo 或抛出错误
|
||||
user_info = UserInfo(user_id="unknown", user_nickname="Unknown", platform="unknown")
|
||||
|
||||
|
||||
return Message(
|
||||
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID
|
||||
|
|
@ -302,7 +296,6 @@ class Conversation:
|
|||
|
||||
# --- 根据不同的 action 执行 ---
|
||||
|
||||
# send_new_message 失败后执行 wait
|
||||
if action == "send_new_message":
|
||||
max_reply_attempts = 3
|
||||
reply_attempt_count = 0
|
||||
|
|
@ -318,13 +311,8 @@ class Conversation:
|
|||
)
|
||||
self.state = ConversationState.GENERATING
|
||||
|
||||
# --- 修改:生成前检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]生成追问回复前检测到新消息,取消动作")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "生成追问前收到新消息,取消"}
|
||||
)
|
||||
return # 直接返回,主循环会重新规划
|
||||
# --- 移除生成前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 1. 生成回复 (调用 generate 时传入 action_type)
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
|
|
@ -334,13 +322,8 @@ class Conversation:
|
|||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}"
|
||||
)
|
||||
|
||||
# --- 修改:检查前再次检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]检查追问回复前检测到新消息,取消动作")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"检查追问前收到新消息,取消发送: {self.generated_reply}"}
|
||||
)
|
||||
return
|
||||
# --- 移除检查前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 2. 检查回复 (逻辑不变)
|
||||
self.state = ConversationState.CHECKING
|
||||
|
|
@ -364,15 +347,15 @@ class Conversation:
|
|||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
|
||||
)
|
||||
|
||||
# --- 修改:记录失败原因和内容到 conversation_info ---
|
||||
# 记录失败原因和内容到 conversation_info
|
||||
if not is_suitable:
|
||||
conversation_info.last_reply_rejection_reason = check_reason
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply
|
||||
# 确保属性存在
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
else:
|
||||
# 清除上次失败记录
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
# --- 记录结束 ---
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', None)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', None)
|
||||
|
||||
if is_suitable:
|
||||
final_reply_to_send = self.generated_reply
|
||||
|
|
@ -388,26 +371,22 @@ class Conversation:
|
|||
)
|
||||
check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}"
|
||||
# 记录失败
|
||||
conversation_info.last_reply_rejection_reason = check_reason
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
break # 出错也跳出循环
|
||||
|
||||
# 循环结束,处理最终结果
|
||||
if is_suitable:
|
||||
# --- 修改:发送前最终检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]发送追问回复前检测到新消息,取消发送")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送追问前收到新消息,取消发送: {final_reply_to_send}"}
|
||||
)
|
||||
return # 直接返回,重新规划
|
||||
# --- 移除发送前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 发送合适的回复
|
||||
self.generated_reply = final_reply_to_send
|
||||
send_success = await self._send_reply() # 调用发送函数,获取发送结果
|
||||
|
||||
if send_success:
|
||||
# --- 修改:发送成功后,标记处理过的消息 ---
|
||||
# --- 发送成功后,标记处理过的消息 ---
|
||||
# 使用 planning_marker_time 标记规划开始前的消息为已处理
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
# 更新状态: 标记上次成功是 send_new_message
|
||||
self.conversation_info.last_successful_reply_action = "send_new_message"
|
||||
|
|
@ -415,21 +394,22 @@ class Conversation:
|
|||
else:
|
||||
# 发送失败处理
|
||||
logger.error(f"[私聊][{self.private_name}]发送追问回复失败")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
# 确保 action_index 有效
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
|
||||
|
||||
|
||||
elif need_replan:
|
||||
# 打回动作决策
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
# 打回时不清空失败记录,让 planner 看到
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置成功状态
|
||||
|
||||
else:
|
||||
|
|
@ -437,12 +417,11 @@ class Conversation:
|
|||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
# 重置状态: 追问失败,下次用初始 prompt
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
# 失败时不清空失败记录
|
||||
|
||||
# 执行 Wait 操作
|
||||
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
|
||||
|
|
@ -475,13 +454,8 @@ class Conversation:
|
|||
)
|
||||
self.state = ConversationState.GENERATING
|
||||
|
||||
# --- 修改:生成前检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]生成首次回复前检测到新消息,取消动作")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "生成首次回复前收到新消息,取消"}
|
||||
)
|
||||
return
|
||||
# --- 移除生成前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 1. 生成回复
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
|
|
@ -491,13 +465,8 @@ class Conversation:
|
|||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}"
|
||||
)
|
||||
|
||||
# --- 修改:检查前再次检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]检查首次回复前检测到新消息,取消动作")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"检查首次回复前收到新消息,取消发送: {self.generated_reply}"}
|
||||
)
|
||||
return
|
||||
# --- 移除检查前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 2. 检查回复
|
||||
self.state = ConversationState.CHECKING
|
||||
|
|
@ -521,15 +490,13 @@ class Conversation:
|
|||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
|
||||
)
|
||||
|
||||
# --- 修改:记录失败原因和内容到 conversation_info ---
|
||||
# 记录失败原因和内容
|
||||
if not is_suitable:
|
||||
conversation_info.last_reply_rejection_reason = check_reason
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
else:
|
||||
# 清除上次失败记录
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
# --- 记录结束 ---
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', None)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', None)
|
||||
|
||||
if is_suitable:
|
||||
final_reply_to_send = self.generated_reply
|
||||
|
|
@ -545,26 +512,21 @@ class Conversation:
|
|||
)
|
||||
check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}"
|
||||
# 记录失败
|
||||
conversation_info.last_reply_rejection_reason = check_reason
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
break # 出错也跳出循环
|
||||
|
||||
# 循环结束,处理最终结果
|
||||
if is_suitable:
|
||||
# --- 修改:发送前最终检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]发送首次回复前检测到新消息,取消发送")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送首次回复前收到新消息,取消发送: {final_reply_to_send}"}
|
||||
)
|
||||
return # 直接返回,重新规划
|
||||
# --- 移除发送前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 发送合适的回复
|
||||
self.generated_reply = final_reply_to_send
|
||||
send_success = await self._send_reply() # 调用发送函数
|
||||
|
||||
if send_success:
|
||||
# --- 修改:发送成功后,标记处理过的消息 ---
|
||||
# --- 发送成功后,标记处理过的消息 ---
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
# 更新状态: 标记上次成功是 direct_reply
|
||||
self.conversation_info.last_successful_reply_action = "direct_reply"
|
||||
|
|
@ -572,21 +534,21 @@ class Conversation:
|
|||
else:
|
||||
# 发送失败处理
|
||||
logger.error(f"[私聊][{self.private_name}]发送首次回复失败")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
|
||||
|
||||
|
||||
elif need_replan:
|
||||
# 打回动作决策
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
# 打回时不清空失败记录
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置成功状态
|
||||
|
||||
else:
|
||||
|
|
@ -594,12 +556,11 @@ class Conversation:
|
|||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
# 重置状态: 首次回复失败,下次还是用初始 prompt
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
# 失败时不清空失败记录
|
||||
|
||||
# 执行 Wait 操作 (保持原有逻辑)
|
||||
logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...")
|
||||
|
|
@ -620,18 +581,12 @@ class Conversation:
|
|||
elif action == "rethink_goal":
|
||||
self.state = ConversationState.RETHINKING
|
||||
try:
|
||||
# 检查 goal_analyzer 是否存在
|
||||
if not hasattr(self, "goal_analyzer"):
|
||||
logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。")
|
||||
raise AttributeError("GoalAnalyzer not initialized")
|
||||
|
||||
# --- rethink_goal 前检查新消息 ---
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]重新思考目标前检测到新消息,取消动作")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "重新思考目标前收到新消息,取消"}
|
||||
)
|
||||
return
|
||||
# --- 移除 rethink_goal 前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
|
||||
# --- rethink_goal 后标记处理过的消息 ---
|
||||
|
|
@ -639,58 +594,49 @@ class Conversation:
|
|||
action_successful = True
|
||||
except Exception as rethink_err:
|
||||
logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
elif action == "listening":
|
||||
self.state = ConversationState.LISTENING
|
||||
logger.info(f"[私聊][{self.private_name}]倾听对方发言...")
|
||||
try:
|
||||
# 检查 waiter 是否存在
|
||||
if not hasattr(self, "waiter"):
|
||||
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。")
|
||||
raise AttributeError("Waiter not initialized")
|
||||
|
||||
# --- listening 前检查新消息 ---
|
||||
# 倾听时如果收到新消息,通常应该继续倾听或转为回复,而不是取消
|
||||
# 所以这里不检查新消息打断
|
||||
|
||||
# --- listening 后标记处理过的消息 ---
|
||||
# 倾听是等待行为,也需要标记之前的消息已处理
|
||||
# --- listening 动作开始前标记处理过的消息 ---
|
||||
# 因为 listening 本身是等待行为,需要在开始等待前处理掉规划时看到的消息
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
await self.waiter.wait_listening(conversation_info)
|
||||
action_successful = True # Listening 完成就算成功
|
||||
except Exception as listen_err:
|
||||
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
elif action == "say_goodbye":
|
||||
self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING
|
||||
self.state = ConversationState.GENERATING
|
||||
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
|
||||
try:
|
||||
# --- 告别前检查新消息 ---
|
||||
# 如果有新消息,可能不适合告别了
|
||||
if self._check_new_messages_during_action(planning_marker_time):
|
||||
logger.info(f"[私聊][{self.private_name}]发送告别语前检测到新消息,取消告别")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语前收到新消息,取消"}
|
||||
)
|
||||
return
|
||||
# --- 移除告别前的检查 ---
|
||||
# if self._check_new_messages_during_action(planning_marker_time): ...
|
||||
|
||||
# 1. 生成告别语 (使用 'say_goodbye' action_type)
|
||||
# 1. 生成告别语
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
observation_info, conversation_info, action_type="say_goodbye"
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
|
||||
|
||||
# 2. 直接发送告别语 (不经过检查)
|
||||
if self.generated_reply: # 确保生成了内容
|
||||
send_success = await self._send_reply() # 调用发送方法
|
||||
# 2. 发送告别语
|
||||
if self.generated_reply:
|
||||
send_success = await self._send_reply()
|
||||
if send_success:
|
||||
# --- 发送成功后标记处理过的消息 ---
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
|
|
@ -699,37 +645,38 @@ class Conversation:
|
|||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]发送告别语失败。")
|
||||
action_successful = False
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语失败"}
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语失败"}
|
||||
)
|
||||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。")
|
||||
action_successful = False # 标记动作失败
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "未能生成告别语内容"}
|
||||
)
|
||||
action_successful = False
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "未能生成告别语内容"}
|
||||
)
|
||||
|
||||
# 3. 无论是否发送成功,都准备结束对话
|
||||
# 3. 结束对话
|
||||
self.should_continue = False
|
||||
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
|
||||
|
||||
except Exception as goodbye_err:
|
||||
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
|
||||
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
|
||||
# 即使出错,也结束对话
|
||||
self.should_continue = False
|
||||
action_successful = False # 标记动作失败
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
|
||||
)
|
||||
action_successful = False
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
|
||||
)
|
||||
|
||||
elif action == "end_conversation":
|
||||
# 这个分支现在只会在 action_planner 最终决定不告别时被调用
|
||||
self.should_continue = False
|
||||
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
|
||||
# --- 结束对话前标记处理过的消息 ---
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
action_successful = True # 标记这个指令本身是成功的
|
||||
action_successful = True
|
||||
|
||||
elif action == "block_and_ignore":
|
||||
logger.info(f"[私聊][{self.private_name}]不想再理你了...")
|
||||
|
|
@ -741,35 +688,30 @@ class Conversation:
|
|||
self.state = ConversationState.IGNORED
|
||||
# --- 屏蔽前标记处理过的消息 ---
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
action_successful = True # 标记动作成功
|
||||
action_successful = True
|
||||
|
||||
else: # 对应 'wait' 动作
|
||||
self.state = ConversationState.WAITING
|
||||
logger.info(f"[私聊][{self.private_name}]等待更多信息...")
|
||||
try:
|
||||
# 检查 waiter 是否存在
|
||||
if not hasattr(self, "waiter"):
|
||||
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。")
|
||||
raise AttributeError("Waiter not initialized")
|
||||
|
||||
# --- Wait 前检查新消息 ---
|
||||
# Wait 时如果收到新消息,wait 逻辑内部会处理并退出,所以这里不打断
|
||||
|
||||
# --- Wait 开始前标记处理过的消息 ---
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
_timeout_occurred = await self.waiter.wait(self.conversation_info)
|
||||
action_successful = True # Wait 完成就算成功
|
||||
except Exception as wait_err:
|
||||
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
# --- 更新 Action History 状态 ---
|
||||
# 只有当动作本身成功时,才更新状态为 done
|
||||
if action_successful:
|
||||
# 确保 action_index 在有效范围内
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{
|
||||
|
|
@ -777,16 +719,11 @@ class Conversation:
|
|||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
}
|
||||
)
|
||||
# 重置状态: 对于非回复类动作的成功,清除上次回复状态
|
||||
if action not in ["direct_reply", "send_new_message"]:
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
# logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action")
|
||||
else:
|
||||
logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}")
|
||||
|
||||
# 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action
|
||||
|
||||
# --- 修改:_send_reply 返回发送是否成功 ---
|
||||
async def _send_reply(self) -> bool:
|
||||
"""发送回复,并返回发送是否成功"""
|
||||
if not self.generated_reply:
|
||||
|
|
@ -794,10 +731,8 @@ class Conversation:
|
|||
return False # 发送失败
|
||||
|
||||
try:
|
||||
_current_time = time.time()
|
||||
reply_content = self.generated_reply
|
||||
|
||||
# 发送消息 (确保 direct_sender 和 chat_stream 有效)
|
||||
if not hasattr(self, "direct_sender") or not self.direct_sender:
|
||||
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
|
||||
return False # 发送失败
|
||||
|
|
@ -807,14 +742,11 @@ class Conversation:
|
|||
|
||||
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
|
||||
|
||||
# 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息
|
||||
# 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持)
|
||||
# 暂时注释掉,观察是否影响 ObservationInfo 的更新
|
||||
# 触发 observer 更新的逻辑保持注释,依赖自动轮询
|
||||
# self.chat_observer.trigger_update()
|
||||
# if not await self.chat_observer.wait_for_update():
|
||||
# logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时")
|
||||
# await self.chat_observer.wait_for_update()
|
||||
|
||||
self.state = ConversationState.ANALYZING # 更新状态
|
||||
self.state = ConversationState.ANALYZING
|
||||
return True # 发送成功
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -826,21 +758,18 @@ class Conversation:
|
|||
async def _send_timeout_message(self):
|
||||
"""发送超时结束消息"""
|
||||
try:
|
||||
# 尝试从 observation_info 获取历史记录
|
||||
if not hasattr(self, 'observation_info') or not self.observation_info.chat_history:
|
||||
logger.warning(f"[私聊][{self.private_name}]无法获取聊天历史,无法发送超时消息。")
|
||||
return
|
||||
|
||||
messages = self.observation_info.chat_history[-1:] # 获取最后一条
|
||||
messages = self.observation_info.chat_history[-1:]
|
||||
if not messages:
|
||||
return
|
||||
|
||||
latest_message_dict = messages[0]
|
||||
# 确保 chat_stream 存在
|
||||
if not self.chat_stream:
|
||||
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送超时消息。")
|
||||
return
|
||||
# 将字典转换为 Message 对象
|
||||
latest_message = self._convert_to_message(latest_message_dict)
|
||||
|
||||
await self.direct_sender.send_message(
|
||||
|
|
|
|||
Loading…
Reference in New Issue