优化处理逻辑

pull/937/head
Bakadax 2025-05-05 17:52:11 +08:00
parent b30d0cbbe8
commit dfa947f7aa
1 changed files with 75 additions and 123 deletions

View File

@ -86,6 +86,10 @@ class Conversation:
self.observation_info.bind_to_chat_observer(self.chat_observer)
self.conversation_info = ConversationInfo()
# --- 初始化上次拒绝回复的信息 ---
self.conversation_info.last_reply_rejection_reason = None
self.conversation_info.last_rejected_reply_content = None
except Exception as e:
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
@ -164,40 +168,31 @@ class Conversation:
continue
try:
# --- 记录规划开始时的时间戳和未处理消息数 ---
# 使用 time.time() 获取当前时间戳作为标记点,更准确反映规划开始的时刻
planning_marker_time = time.time()
initial_unprocessed_count = self.observation_info.new_messages_count
logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}")
# --- 调用 Action Planner ---
# 传递 self.conversation_info.last_successful_reply_action
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
)
# --- 规划后检查是否有 *过多* 新消息到达 ---
# 检查规划期间(调用 plan 函数的时间段内)新收到的消息数
current_unprocessed_count = self.observation_info.new_messages_count
# planning_buffer = 2 # 用户指定的缓冲值
planning_buffer = 2 # 使用用户指定的缓冲值
planning_buffer = 2 # 用户指定的缓冲值
new_messages_during_planning = current_unprocessed_count - initial_unprocessed_count
if new_messages_during_planning > planning_buffer:
logger.info(
f"[私聊][{self.private_name}]规划期间收到 {new_messages_during_planning} 条新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划"
)
# 重置上次成功回复状态,因为要响应新消息
self.conversation_info.last_successful_reply_action = None
# 记录被放弃的规划动作 (可选)
# current_action_record = { ... status: "recalled_before_execution" ... }
# conversation_info.done_action.append(current_action_record)
await asyncio.sleep(0.1) # 短暂等待
await asyncio.sleep(0.1)
continue # 重新进入循环进行规划
# --- 如果规划期间新消息未超限,则继续执行规划的动作 ---
# 将 planning_marker_time 传递给 _handle_action
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time)
# 将 planning_marker_time 和 new_messages_during_planning 传递给 _handle_action
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_messages_during_planning)
# 检查是否需要结束对话 (逻辑不变)
goal_ended = False
@ -220,30 +215,23 @@ class Conversation:
except Exception as loop_err:
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
await asyncio.sleep(1) # 发生错误时等待一段时间
await asyncio.sleep(1)
if self.should_continue:
await asyncio.sleep(0.1) # 保持循环间的短暂间隔
await asyncio.sleep(0.1)
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
# --- 移除 _check_new_messages_during_action 方法 ---
# 这个方法不再需要,因为检查逻辑已合并到 _plan_and_action_loop 中,
# 并且 _handle_action 内部不再需要打断执行。
# def _check_new_messages_during_action(self, planning_marker_time: float, buffer: int = 0) -> bool:
# # ... (旧代码) ...
# pass
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
# 尝试从 msg_dict 直接获取 chat_stream如果失败则从全局 chat_manager 获取
chat_info = msg_dict.get("chat_info")
if chat_info and isinstance(chat_info, dict):
chat_stream = ChatStream.from_dict(chat_info)
elif self.chat_stream: # 使用实例变量中的 chat_stream
elif self.chat_stream:
chat_stream = self.chat_stream
else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id)
else:
chat_stream = chat_manager.get_stream(self.stream_id)
if not chat_stream:
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
@ -253,32 +241,29 @@ class Conversation:
user_info = UserInfo.from_dict(user_info_dict)
else:
logger.warning(f"Message user_info is not a dict: {user_info_dict}")
# 根据需要返回默认 UserInfo 或抛出错误
user_info = UserInfo(user_id="unknown", user_nickname="Unknown", platform="unknown")
return Message(
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID
chat_stream=chat_stream, # 使用确定的 chat_stream
time=msg_dict.get("time", time.time()), # 提供默认时间
message_id=msg_dict.get("message_id", f"gen_{time.time()}"),
chat_stream=chat_stream,
time=msg_dict.get("time", time.time()),
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
# 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
# --- 修改_handle_action 接收 planning_marker_time ---
# --- 修改_handle_action 接收 planning_marker_time 和 new_messages_during_planning ---
async def _handle_action(
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_messages_during_planning: int
):
"""处理规划的行动"""
logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}")
# 记录action历史 (逻辑不变)
# 记录action历史
current_action_record = {
"action": action,
"plan_reason": reason,
@ -286,13 +271,12 @@ class Conversation:
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
# 确保 done_action 列表存在
if not hasattr(conversation_info, "done_action"):
conversation_info.done_action = []
conversation_info.done_action.append(current_action_record)
action_index = len(conversation_info.done_action) - 1
action_successful = False # 用于标记动作是否成功完成
action_successful = False
# --- 根据不同的 action 执行 ---
@ -311,10 +295,7 @@ class Conversation:
)
self.state = ConversationState.GENERATING
# --- 移除生成前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 1. 生成回复 (调用 generate 时传入 action_type)
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="send_new_message"
)
@ -322,13 +303,10 @@ class Conversation:
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}"
)
# --- 移除检查前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 2. 检查回复 (逻辑不变)
# 2. 检查回复
self.state = ConversationState.CHECKING
try:
current_goal_str = "" # 初始化
current_goal_str = ""
if conversation_info.goal_list:
first_goal = conversation_info.goal_list[0]
if isinstance(first_goal, dict):
@ -347,13 +325,10 @@ class Conversation:
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
# 记录失败原因和内容到 conversation_info
if not is_suitable:
# 确保属性存在
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
else:
# 清除上次失败记录
setattr(conversation_info, 'last_reply_rejection_reason', None)
setattr(conversation_info, 'last_rejected_reply_content', None)
@ -364,45 +339,46 @@ class Conversation:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}"
)
break # 跳出循环,后续会处理 need_replan
break
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
# 记录失败
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
break # 出错也跳出循环
break
# 循环结束,处理最终结果
if is_suitable:
# --- 移除发送前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 发送合适的回复
self.generated_reply = final_reply_to_send
send_success = await self._send_reply() # 调用发送函数,获取发送结果
send_success = await self._send_reply()
if send_success:
# --- 发送成功后,标记处理过的消息 ---
# 使用 planning_marker_time 标记规划开始前的消息为已处理
# 发送成功后,标记处理过的消息
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# 更新状态: 标记上次成功是 send_new_message
self.conversation_info.last_successful_reply_action = "send_new_message"
action_successful = True # 标记动作成功
# --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 ---
if new_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_messages_during_planning} 条新消息,强制重置回复状态以进行新规划。")
self.conversation_info.last_successful_reply_action = None # 强制重新规划
else:
# 只有在规划期间没有新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无新消息,允许下次进入追问状态。")
self.conversation_info.last_successful_reply_action = "send_new_message"
# --- 核心逻辑修改结束 ---
action_successful = True
else:
# 发送失败处理
logger.error(f"[私聊][{self.private_name}]发送追问回复失败")
# 确保 action_index 有效
if action_index < len(conversation_info.done_action):
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"}
)
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
self.conversation_info.last_successful_reply_action = None
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}"
)
@ -410,10 +386,9 @@ class Conversation:
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
)
self.conversation_info.last_successful_reply_action = None # 重置成功状态
self.conversation_info.last_successful_reply_action = None
else:
# 追问失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}"
)
@ -426,7 +401,6 @@ class Conversation:
# 执行 Wait 操作
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
# --- Wait 操作也需要标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait(self.conversation_info)
wait_action_record = {
@ -437,7 +411,7 @@ class Conversation:
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
action_successful = True # Wait 本身算成功完成
action_successful = True
elif action == "direct_reply":
max_reply_attempts = 3
@ -454,9 +428,6 @@ class Conversation:
)
self.state = ConversationState.GENERATING
# --- 移除生成前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="direct_reply"
@ -465,13 +436,10 @@ class Conversation:
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}"
)
# --- 移除检查前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 2. 检查回复
self.state = ConversationState.CHECKING
try:
current_goal_str = "" # 初始化
current_goal_str = ""
if conversation_info.goal_list:
first_goal = conversation_info.goal_list[0]
if isinstance(first_goal, dict):
@ -490,7 +458,6 @@ class Conversation:
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
# 记录失败原因和内容
if not is_suitable:
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
@ -505,43 +472,46 @@ class Conversation:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}"
)
break # 跳出循环
break
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
# 记录失败
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
break # 出错也跳出循环
break
# 循环结束,处理最终结果
if is_suitable:
# --- 移除发送前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 发送合适的回复
self.generated_reply = final_reply_to_send
send_success = await self._send_reply() # 调用发送函数
send_success = await self._send_reply()
if send_success:
# --- 发送成功后,标记处理过的消息 ---
# 发送成功后,标记处理过的消息
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# 更新状态: 标记上次成功是 direct_reply
self.conversation_info.last_successful_reply_action = "direct_reply"
action_successful = True # 标记动作成功
# --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 ---
if new_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_messages_during_planning} 条新消息,强制重置回复状态以进行新规划。")
self.conversation_info.last_successful_reply_action = None # 强制重新规划
else:
# 只有在规划期间没有新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无新消息,允许下次进入追问状态。")
self.conversation_info.last_successful_reply_action = "direct_reply"
# --- 核心逻辑修改结束 ---
action_successful = True
else:
# 发送失败处理
logger.error(f"[私聊][{self.private_name}]发送首次回复失败")
if action_index < len(conversation_info.done_action):
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"}
)
self.conversation_info.last_successful_reply_action = None # 发送失败,重置状态
self.conversation_info.last_successful_reply_action = None
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}"
)
@ -549,10 +519,9 @@ class Conversation:
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
)
self.conversation_info.last_successful_reply_action = None # 重置成功状态
self.conversation_info.last_successful_reply_action = None
else:
# 首次回复失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}"
)
@ -562,10 +531,9 @@ class Conversation:
)
self.conversation_info.last_successful_reply_action = None
# 执行 Wait 操作 (保持原有逻辑)
# 执行 Wait 操作
logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
# --- Wait 操作也需要标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait(self.conversation_info)
wait_action_record = {
@ -576,7 +544,7 @@ class Conversation:
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
action_successful = True # Wait 本身算成功
action_successful = True
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
@ -585,11 +553,7 @@ class Conversation:
logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。")
raise AttributeError("GoalAnalyzer not initialized")
# --- 移除 rethink_goal 前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
# --- rethink_goal 后标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
except Exception as rethink_err:
@ -608,11 +572,9 @@ class Conversation:
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。")
raise AttributeError("Waiter not initialized")
# --- listening 动作开始前标记处理过的消息 ---
# 因为 listening 本身是等待行为,需要在开始等待前处理掉规划时看到的消息
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait_listening(conversation_info)
action_successful = True # Listening 完成就算成功
action_successful = True
except Exception as listen_err:
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
if action_index < len(conversation_info.done_action):
@ -625,20 +587,14 @@ class Conversation:
self.state = ConversationState.GENERATING
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
try:
# --- 移除告别前的检查 ---
# if self._check_new_messages_during_action(planning_marker_time): ...
# 1. 生成告别语
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="say_goodbye"
)
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
# 2. 发送告别语
if self.generated_reply:
send_success = await self._send_reply()
if send_success:
# --- 发送成功后标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
logger.info(f"[私聊][{self.private_name}]告别语已发送。")
@ -657,7 +613,6 @@ class Conversation:
{"status": "recall", "final_reason": "未能生成告别语内容"}
)
# 3. 结束对话
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
@ -674,7 +629,6 @@ class Conversation:
elif action == "end_conversation":
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
# --- 结束对话前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
@ -686,7 +640,6 @@ class Conversation:
f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
)
self.state = ConversationState.IGNORED
# --- 屏蔽前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
@ -698,10 +651,9 @@ class Conversation:
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。")
raise AttributeError("Waiter not initialized")
# --- Wait 开始前标记处理过的消息 ---
await observation_info.mark_messages_processed_up_to(planning_marker_time)
_timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # Wait 完成就算成功
action_successful = True
except Exception as wait_err:
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
if action_index < len(conversation_info.done_action):
@ -713,14 +665,18 @@ class Conversation:
# --- 更新 Action History 状态 ---
if action_successful:
if action_index < len(conversation_info.done_action):
# 只有在明确不需要强制重新规划时,才在非回复动作后重置状态
if not (action in ["direct_reply", "send_new_message"] and new_messages_during_planning > 0):
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
else:
logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}")
@ -728,32 +684,28 @@ class Conversation:
"""发送回复,并返回发送是否成功"""
if not self.generated_reply:
logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。")
return False # 发送失败
return False
try:
reply_content = self.generated_reply
if not hasattr(self, "direct_sender") or not self.direct_sender:
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
return False # 发送失败
return False
if not self.chat_stream:
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。")
return False # 发送失败
return False
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
# 触发 observer 更新的逻辑保持注释,依赖自动轮询
# self.chat_observer.trigger_update()
# await self.chat_observer.wait_for_update()
self.state = ConversationState.ANALYZING
return True # 发送成功
return True
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
self.state = ConversationState.ANALYZING
return False # 发送失败
return False
async def _send_timeout_message(self):
"""发送超时结束消息"""