优化缓存逻辑

pull/937/head
Bakadax 2025-05-05 18:22:54 +08:00
parent dfa947f7aa
commit ad2d96c570
1 changed files with 51 additions and 35 deletions

View File

@ -6,7 +6,7 @@ import datetime
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from ...config.config import global_config # 确保导入 global_config
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Set # 引入 Set
from ..chat.message import Message
from .pfc_types import ConversationState
# 确保导入 ChatObserver 和 GoalAnalyzer (如果 pfc.py 中定义了它们)
@ -167,32 +167,54 @@ class Conversation:
self.should_continue = False
continue
try:
# --- 记录规划开始时的时间戳和未处理消息 ---
# --- 记录规划开始时的时间戳和未处理消息的 ID 集合 ---
planning_marker_time = time.time()
initial_unprocessed_count = self.observation_info.new_messages_count
logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理: {initial_unprocessed_count}")
# 获取规划开始时未处理消息的 ID 集合
initial_unprocessed_ids: Set[str] = {
msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")
}
logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理消息ID数: {len(initial_unprocessed_ids)}")
# --- 调用 Action Planner ---
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
)
# --- 规划后检查是否有 *过多* 新消息到达 ---
current_unprocessed_count = self.observation_info.new_messages_count
planning_buffer = 2 # 用户指定的缓冲值
new_messages_during_planning = current_unprocessed_count - initial_unprocessed_count
# --- 规划后,精确计算规划期间收到的“用户”新消息数 ---
current_unprocessed_messages = self.observation_info.unprocessed_messages
new_messages_during_planning = []
for msg in current_unprocessed_messages:
msg_id = msg.get("message_id")
# 检查消息ID是否不在初始集合中且消息时间戳晚于规划开始时间增加时间判断以防万一
if msg_id and msg_id not in initial_unprocessed_ids and msg.get("time", 0) >= planning_marker_time:
new_messages_during_planning.append(msg)
if new_messages_during_planning > planning_buffer:
# 计算这些新消息中来自用户的数量
new_user_messages_count = 0
for msg in new_messages_during_planning:
user_info_dict = msg.get("user_info", {})
sender_id = None
if isinstance(user_info_dict, dict):
sender_id = str(user_info_dict.get("user_id")) # 确保是字符串
# 检查发送者ID是否不是机器人ID
if sender_id and sender_id != self.bot_id:
new_user_messages_count += 1
logger.debug(f"[私聊][{self.private_name}]规划期间共收到新消息: {len(new_messages_during_planning)} 条, 其中用户消息: {new_user_messages_count}")
# --- 根据用户新消息数决定是否重新规划 ---
planning_buffer = 2 # 用户指定的缓冲值
if new_user_messages_count > planning_buffer:
logger.info(
f"[私聊][{self.private_name}]规划期间收到 {new_messages_during_planning} 条新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划"
f"[私聊][{self.private_name}]规划期间收到 {new_user_messages_count} 条用户新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划"
)
self.conversation_info.last_successful_reply_action = None
await asyncio.sleep(0.1)
continue # 重新进入循环进行规划
# --- 如果规划期间新消息未超限,则继续执行规划的动作 ---
# 将 planning_marker_time 和 new_messages_during_planning 传递给 _handle_action
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_messages_during_planning)
# --- 如果规划期间用户新消息未超限,则继续执行规划的动作 ---
# 将 planning_marker_time 和 new_user_messages_count 传递给 _handle_action
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_user_messages_count)
# 检查是否需要结束对话 (逻辑不变)
goal_ended = False
@ -255,9 +277,9 @@ class Conversation:
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
# --- 修改_handle_action 接收 planning_marker_time 和 new_messages_during_planning ---
# --- 修改_handle_action 接收 planning_marker_time 和 new_user_messages_count ---
async def _handle_action(
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_messages_during_planning: int
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_user_messages_during_planning: int
):
"""处理规划的行动"""
@ -359,13 +381,13 @@ class Conversation:
# 发送成功后,标记处理过的消息
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 ---
if new_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_messages_during_planning}新消息,强制重置回复状态以进行新规划。")
# --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 ---
if new_user_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_user_messages_during_planning}用户新消息,强制重置回复状态以进行新规划。")
self.conversation_info.last_successful_reply_action = None # 强制重新规划
else:
# 只有在规划期间没有新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无新消息,允许下次进入追问状态。")
# 只有在规划期间没有用户新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无用户新消息,允许下次进入追问状态。")
self.conversation_info.last_successful_reply_action = "send_new_message"
# --- 核心逻辑修改结束 ---
@ -492,13 +514,13 @@ class Conversation:
# 发送成功后,标记处理过的消息
await observation_info.mark_messages_processed_up_to(planning_marker_time)
# --- 核心逻辑修改:根据规划期间是否有新消息决定下一步状态 ---
if new_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_messages_during_planning}新消息,强制重置回复状态以进行新规划。")
# --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 ---
if new_user_messages_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_user_messages_during_planning}用户新消息,强制重置回复状态以进行新规划。")
self.conversation_info.last_successful_reply_action = None # 强制重新规划
else:
# 只有在规划期间没有新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无新消息,允许下次进入追问状态。")
# 只有在规划期间没有用户新消息时,才设置追问状态
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无用户新消息,允许下次进入追问状态。")
self.conversation_info.last_successful_reply_action = "direct_reply"
# --- 核心逻辑修改结束 ---
@ -546,13 +568,13 @@ class Conversation:
conversation_info.done_action.append(wait_action_record)
action_successful = True
# --- 其他动作的处理逻辑保持不变,但确保在成功后调用 mark_messages_processed_up_to ---
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
try:
if not hasattr(self, "goal_analyzer"):
logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。")
raise AttributeError("GoalAnalyzer not initialized")
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
await observation_info.mark_messages_processed_up_to(planning_marker_time)
action_successful = True
@ -571,7 +593,6 @@ class Conversation:
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。")
raise AttributeError("Waiter not initialized")
await observation_info.mark_messages_processed_up_to(planning_marker_time)
await self.waiter.wait_listening(conversation_info)
action_successful = True
@ -591,7 +612,6 @@ class Conversation:
observation_info, conversation_info, action_type="say_goodbye"
)
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
if self.generated_reply:
send_success = await self._send_reply()
if send_success:
@ -612,10 +632,8 @@ class Conversation:
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "未能生成告别语内容"}
)
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
except Exception as goodbye_err:
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
@ -650,7 +668,6 @@ class Conversation:
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。")
raise AttributeError("Waiter not initialized")
await observation_info.mark_messages_processed_up_to(planning_marker_time)
_timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True
@ -666,9 +683,9 @@ class Conversation:
if action_successful:
if action_index < len(conversation_info.done_action):
# 只有在明确不需要强制重新规划时,才在非回复动作后重置状态
if not (action in ["direct_reply", "send_new_message"] and new_messages_during_planning > 0):
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
# 注意:这里的条件与回复动作后的逻辑略有不同,因为非回复动作本身就不会进入追问
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
conversation_info.done_action[action_index].update(
{
@ -676,7 +693,6 @@ class Conversation:
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
else:
logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}")