mirror of https://github.com/Mai-with-u/MaiBot.git
修复消息缓存逻辑
commit
217dbb1cb9
|
|
@ -1,6 +1,7 @@
|
|||
import time
|
||||
from typing import Tuple, Optional
|
||||
from .pfc_utils import retrieve_contextual_info
|
||||
|
||||
# import jieba # 如果需要旧版知识库的回退,可能需要
|
||||
# import re # 如果需要旧版知识库的回退,可能需要
|
||||
from src.common.logger_manager import get_logger
|
||||
|
|
@ -336,10 +337,13 @@ class ActionPlanner:
|
|||
last_action_context += f"- 该行动当前状态: {status}\n"
|
||||
# self.last_successful_action_type = None # 非完成状态,清除记录
|
||||
|
||||
retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(chat_history_text, self.private_name)
|
||||
retrieved_memory_str_planner, retrieved_knowledge_str_planner = await retrieve_contextual_info(
|
||||
chat_history_text, self.private_name
|
||||
)
|
||||
# Optional: 可以加一行日志确认结果,方便调试
|
||||
logger.info(f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}")
|
||||
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}] (ActionPlanner) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str_planner else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str_planner and '无相关知识' not in retrieved_knowledge_str_planner else '无'}"
|
||||
)
|
||||
|
||||
# --- 选择 Prompt ---
|
||||
if last_successful_reply_action in ["direct_reply", "send_new_message"]:
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# File: conversation.py
|
||||
import time
|
||||
import asyncio
|
||||
import datetime
|
||||
|
|
@ -9,10 +11,7 @@ from ...config.config import global_config # 确保导入 global_config
|
|||
from typing import Dict, Any, Optional, Set # 引入 Set
|
||||
from ..chat.message import Message
|
||||
from .pfc_types import ConversationState
|
||||
# 确保导入 ChatObserver 和 GoalAnalyzer (如果 pfc.py 中定义了它们)
|
||||
# from .pfc import ChatObserver, GoalAnalyzer # 可能需要调整导入路径
|
||||
from .chat_observer import ChatObserver # 导入 ChatObserver
|
||||
from .pfc import GoalAnalyzer # 导入 GoalAnalyzer
|
||||
from .pfc import ChatObserver, GoalAnalyzer # pfc.py 包含了 GoalAnalyzer,无需重复导入
|
||||
from .message_sender import DirectMessageSender
|
||||
from src.common.logger_manager import get_logger
|
||||
from .action_planner import ActionPlanner
|
||||
|
|
@ -22,7 +21,7 @@ from .reply_generator import ReplyGenerator
|
|||
from ..chat.chat_stream import ChatStream
|
||||
from maim_message import UserInfo
|
||||
from src.plugins.chat.chat_stream import chat_manager
|
||||
from .pfc_KnowledgeFetcher import KnowledgeFetcher
|
||||
from .pfc_KnowledgeFetcher import KnowledgeFetcher # 注意:这里是 PFC_KnowledgeFetcher.py
|
||||
from .waiter import Waiter
|
||||
|
||||
import traceback
|
||||
|
|
@ -133,6 +132,10 @@ class Conversation:
|
|||
)
|
||||
|
||||
# 让 ChatObserver 从加载的最后一条消息之后开始同步
|
||||
# **** 注意:这里的 last_message_time 设置可能需要 review ****
|
||||
# 如果数据库消息时间戳可能不完全连续,直接设置 last_message_time 可能导致 observer 错过消息
|
||||
# 更稳妥的方式是让 observer 自己管理其内部的 last_message_time 或 last_message_id
|
||||
# 暂时保留,但标记为潜在问题点。如果 observer 逻辑是可靠的,则此行 OK。
|
||||
self.chat_observer.last_message_time = self.observation_info.last_message_time
|
||||
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
|
||||
else:
|
||||
|
|
@ -157,7 +160,7 @@ class Conversation:
|
|||
async def _plan_and_action_loop(self):
|
||||
"""思考步,PFC核心循环模块"""
|
||||
while self.should_continue:
|
||||
# 忽略逻辑
|
||||
# 忽略逻辑 (保持不变)
|
||||
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
|
||||
await asyncio.sleep(30)
|
||||
continue
|
||||
|
|
@ -166,67 +169,92 @@ class Conversation:
|
|||
self.ignore_until_timestamp = None
|
||||
self.should_continue = False
|
||||
continue
|
||||
try:
|
||||
# --- 记录规划开始时的时间戳和未处理消息的 ID 集合 ---
|
||||
planning_marker_time = time.time()
|
||||
# 获取规划开始时未处理消息的 ID 集合
|
||||
initial_unprocessed_ids: Set[str] = {
|
||||
msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")
|
||||
}
|
||||
logger.debug(f"[私聊][{self.private_name}]规划开始标记时间: {planning_marker_time}, 初始未处理消息ID数: {len(initial_unprocessed_ids)}")
|
||||
|
||||
# --- 调用 Action Planner ---
|
||||
try:
|
||||
# --- [修改点 1] 在规划前记录新消息状态 ---
|
||||
# 记录规划开始时未处理消息的 ID 集合,用于后续判断哪些是新来的
|
||||
message_ids_before_planning = set()
|
||||
initial_unprocessed_message_count = 0 # <-- 新增:记录规划前的未处理消息数
|
||||
if hasattr(self.observation_info, "unprocessed_messages"):
|
||||
message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")}
|
||||
initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) # <-- 获取初始数量
|
||||
logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning."
|
||||
)
|
||||
|
||||
|
||||
# --- 调用 Action Planner (保持不变) ---
|
||||
action, reason = await self.action_planner.plan(
|
||||
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
|
||||
)
|
||||
|
||||
# --- 规划后,精确计算规划期间收到的“用户”新消息数 ---
|
||||
current_unprocessed_messages = self.observation_info.unprocessed_messages
|
||||
new_messages_during_planning = []
|
||||
# --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 ---
|
||||
current_unprocessed_messages = []
|
||||
current_unprocessed_message_count = 0
|
||||
if hasattr(self.observation_info, "unprocessed_messages"):
|
||||
current_unprocessed_messages = self.observation_info.unprocessed_messages
|
||||
current_unprocessed_message_count = len(current_unprocessed_messages) # <-- 获取当前数量
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning."
|
||||
)
|
||||
|
||||
# 计算规划期间实际新增的消息数量
|
||||
new_messages_during_planning_count = 0
|
||||
new_message_ids_during_planning = set()
|
||||
for msg in current_unprocessed_messages:
|
||||
msg_id = msg.get("message_id")
|
||||
# 检查消息ID是否不在初始集合中,且消息时间戳晚于规划开始时间(增加时间判断以防万一)
|
||||
if msg_id and msg_id not in initial_unprocessed_ids and msg.get("time", 0) >= planning_marker_time:
|
||||
new_messages_during_planning.append(msg)
|
||||
if msg_id and msg_id not in message_ids_before_planning:
|
||||
new_messages_during_planning_count += 1
|
||||
new_message_ids_during_planning.add(msg_id)
|
||||
|
||||
# 计算这些新消息中来自用户的数量
|
||||
new_user_messages_count = 0
|
||||
for msg in new_messages_during_planning:
|
||||
user_info_dict = msg.get("user_info", {})
|
||||
sender_id = None
|
||||
if isinstance(user_info_dict, dict):
|
||||
sender_id = str(user_info_dict.get("user_id")) # 确保是字符串
|
||||
# 检查发送者ID是否不是机器人ID
|
||||
if sender_id and sender_id != self.bot_id:
|
||||
new_user_messages_count += 1
|
||||
logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}")
|
||||
|
||||
logger.debug(f"[私聊][{self.private_name}]规划期间共收到新消息: {len(new_messages_during_planning)} 条, 其中用户消息: {new_user_messages_count} 条")
|
||||
|
||||
# --- 根据用户新消息数决定是否重新规划 ---
|
||||
planning_buffer = 2 # 用户指定的缓冲值
|
||||
if new_user_messages_count > planning_buffer:
|
||||
# **核心逻辑:判断是否中断**
|
||||
# 这里的 +2 是根据你的需求来的,代表允许的缓冲
|
||||
# 我们比较的是 *规划期间新增的消息数* 是否超过阈值
|
||||
if new_messages_during_planning_count > 2:
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]规划期间收到 {new_user_messages_count} 条用户新消息 (超过缓冲 {planning_buffer}),放弃当前计划 '{action}',立即重新规划"
|
||||
f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划"
|
||||
)
|
||||
# 中断时,重置上次回复状态,因为需要基于最新消息重新决策
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
await asyncio.sleep(0.1)
|
||||
continue # 重新进入循环进行规划
|
||||
# **重要**: 中断时不清空未处理消息,留给下一轮规划处理
|
||||
await asyncio.sleep(0.1) # 短暂暂停避免CPU空转
|
||||
continue # 跳过本轮后续处理,直接进入下一轮循环重新规划
|
||||
|
||||
# --- 如果规划期间用户新消息未超限,则继续执行规划的动作 ---
|
||||
# 将 planning_marker_time 和 new_user_messages_count 传递给 _handle_action
|
||||
await self._handle_action(action, reason, self.observation_info, self.conversation_info, planning_marker_time, new_user_messages_count)
|
||||
# --- [修改点 3] 准备执行动作,处理规划时已知的消息 ---
|
||||
# 如果决定要回复 (direct_reply 或 send_new_message),并且规划开始时就有未处理消息
|
||||
# 这表示 LLM 规划时已经看到了这些消息
|
||||
# 我们需要在发送回复 *后* 清理掉这些规划时已知的消息
|
||||
# 注意:这里不再立即清理,清理逻辑移到 _handle_action 成功发送后
|
||||
messages_known_during_planning = []
|
||||
if action in ["direct_reply", "send_new_message"] and initial_unprocessed_message_count > 0:
|
||||
messages_known_during_planning = [
|
||||
msg for msg_id in message_ids_before_planning
|
||||
if (msg := next((m for m in self.observation_info.unprocessed_messages if m.get("message_id") == msg_id), None)) is not None
|
||||
]
|
||||
logger.debug(f"[私聊][{self.private_name}]规划时已知 {len(messages_known_during_planning)} 条消息,将在回复成功后清理。")
|
||||
|
||||
# 检查是否需要结束对话 (逻辑不变)
|
||||
|
||||
# --- 执行动作 ---
|
||||
# 将规划时已知需要清理的消息ID集合传递给 _handle_action
|
||||
await self._handle_action(action, reason, self.observation_info, self.conversation_info, message_ids_before_planning)
|
||||
|
||||
# --- 检查是否需要结束对话 (逻辑保持不变) ---
|
||||
goal_ended = False
|
||||
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
|
||||
for goal_item in self.conversation_info.goal_list:
|
||||
current_goal = None # 初始化
|
||||
current_goal = None # 初始化 current_goal
|
||||
if isinstance(goal_item, dict):
|
||||
current_goal = goal_item.get("goal")
|
||||
elif isinstance(goal_item, str): # 处理直接是字符串的情况
|
||||
current_goal = goal_item
|
||||
current_goal = goal_item
|
||||
|
||||
if current_goal == "结束对话":
|
||||
# 确保 current_goal 是字符串再比较
|
||||
if isinstance(current_goal, str) and current_goal == "结束对话":
|
||||
goal_ended = True
|
||||
break
|
||||
|
||||
|
|
@ -237,16 +265,49 @@ class Conversation:
|
|||
except Exception as loop_err:
|
||||
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
|
||||
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
|
||||
await asyncio.sleep(1)
|
||||
await asyncio.sleep(1) # 出错时暂停一下
|
||||
|
||||
# 循环间隔
|
||||
if self.should_continue:
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.1) # 非阻塞的短暂停
|
||||
|
||||
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
|
||||
|
||||
|
||||
# --- [修改点 4] 修改 _check_new_messages_after_planning ---
|
||||
# 重命名并修改逻辑,用于在 *发送前* 检查是否有过多新消息(兜底检查)
|
||||
def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool:
|
||||
"""在发送回复前,最后检查一次是否有过多新消息导致需要中断"""
|
||||
if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "unprocessed_messages"):
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'unprocessed_messages' 属性,无法检查新消息。"
|
||||
)
|
||||
return False
|
||||
|
||||
current_unprocessed_messages = self.observation_info.unprocessed_messages
|
||||
new_messages_count = 0
|
||||
for msg in current_unprocessed_messages:
|
||||
msg_id = msg.get("message_id")
|
||||
if msg_id and msg_id not in message_ids_before_planning:
|
||||
new_messages_count += 1
|
||||
|
||||
# 使用与规划后检查相同的阈值
|
||||
if new_messages_count > 2:
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]准备发送时发现新增消息数 ({new_messages_count}) 超过阈值(2),取消发送并重新规划"
|
||||
)
|
||||
if hasattr(self, "conversation_info"):
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。"
|
||||
)
|
||||
return True # 需要中断
|
||||
return False # 不需要中断
|
||||
|
||||
|
||||
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
|
||||
"""将消息字典转换为Message对象"""
|
||||
"""将消息字典转换为Message对象 (保持不变)"""
|
||||
try:
|
||||
chat_info = msg_dict.get("chat_info")
|
||||
if chat_info and isinstance(chat_info, dict):
|
||||
|
|
@ -277,9 +338,14 @@ class Conversation:
|
|||
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
|
||||
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
|
||||
|
||||
# --- 修改:_handle_action 接收 planning_marker_time 和 new_user_messages_count ---
|
||||
# --- [修改点 5] 修改 _handle_action 签名并调整内部逻辑 ---
|
||||
async def _handle_action(
|
||||
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo, planning_marker_time: float, new_user_messages_during_planning: int
|
||||
self,
|
||||
action: str,
|
||||
reason: str,
|
||||
observation_info: ObservationInfo,
|
||||
conversation_info: ConversationInfo,
|
||||
message_ids_before_planning: set # <-- 接收规划前的消息ID集合
|
||||
):
|
||||
"""处理规划的行动"""
|
||||
|
||||
|
|
@ -299,10 +365,11 @@ class Conversation:
|
|||
action_index = len(conversation_info.done_action) - 1
|
||||
|
||||
action_successful = False
|
||||
reply_sent = False # <-- 新增:标记是否成功发送了回复
|
||||
|
||||
# --- 根据不同的 action 执行 ---
|
||||
|
||||
if action == "send_new_message":
|
||||
if action == "direct_reply" or action == "send_new_message":
|
||||
# 合并 direct_reply 和 send_new_message 的大部分逻辑
|
||||
max_reply_attempts = 3
|
||||
reply_attempt_count = 0
|
||||
is_suitable = False
|
||||
|
|
@ -312,263 +379,167 @@ class Conversation:
|
|||
|
||||
while reply_attempt_count < max_reply_attempts and not is_suitable:
|
||||
reply_attempt_count += 1
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
|
||||
)
|
||||
log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
|
||||
logger.info(log_prefix)
|
||||
self.state = ConversationState.GENERATING
|
||||
|
||||
# 1. 生成回复
|
||||
# 1. 生成回复 (传入 action_type)
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
observation_info, conversation_info, action_type="send_new_message"
|
||||
)
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}"
|
||||
observation_info, conversation_info, action_type=action
|
||||
)
|
||||
logger.info(f"{log_prefix} 生成内容: {self.generated_reply}")
|
||||
|
||||
# 2. 检查回复
|
||||
self.state = ConversationState.CHECKING
|
||||
try:
|
||||
current_goal_str = ""
|
||||
if conversation_info.goal_list:
|
||||
first_goal = conversation_info.goal_list[0]
|
||||
if isinstance(first_goal, dict):
|
||||
current_goal_str = first_goal.get("goal", "")
|
||||
elif isinstance(first_goal, str):
|
||||
current_goal_str = first_goal
|
||||
current_goal_str = "" # 初始化
|
||||
if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list:
|
||||
goal_item = conversation_info.goal_list[0] # 取第一个目标
|
||||
if isinstance(goal_item, dict):
|
||||
current_goal_str = goal_item.get('goal', '')
|
||||
elif isinstance(goal_item, str):
|
||||
current_goal_str = goal_item
|
||||
|
||||
# 确保 chat_history 和 chat_history_str 存在
|
||||
chat_history_for_check = getattr(observation_info, 'chat_history', [])
|
||||
chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '')
|
||||
|
||||
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
|
||||
reply=self.generated_reply,
|
||||
goal=current_goal_str,
|
||||
chat_history=observation_info.chat_history,
|
||||
chat_history_str=observation_info.chat_history_str,
|
||||
chat_history=chat_history_for_check,
|
||||
chat_history_str=chat_history_str_for_check,
|
||||
retry_count=reply_attempt_count - 1,
|
||||
)
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
|
||||
f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
|
||||
)
|
||||
|
||||
if not is_suitable:
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
# 更新拒绝原因和内容 (仅在不合适或需要重规划时)
|
||||
if not is_suitable or need_replan:
|
||||
conversation_info.last_reply_rejection_reason = check_reason
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply
|
||||
else:
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', None)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', None)
|
||||
# 检查通过,清空上次拒绝记录
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
if is_suitable:
|
||||
final_reply_to_send = self.generated_reply
|
||||
break
|
||||
break # 检查通过,跳出循环
|
||||
elif need_replan:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}"
|
||||
f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}"
|
||||
)
|
||||
break
|
||||
break # 需要重新规划,跳出循环
|
||||
except Exception as check_err:
|
||||
logger.error(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}"
|
||||
f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}"
|
||||
)
|
||||
check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}"
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
break
|
||||
conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}"
|
||||
conversation_info.last_rejected_reply_content = self.generated_reply # 记录出错时尝试的内容
|
||||
break # 检查出错,跳出循环
|
||||
|
||||
# 循环结束,处理最终结果
|
||||
# --- 处理生成和检查的结果 ---
|
||||
if is_suitable:
|
||||
# 发送合适的回复
|
||||
# --- [修改点 6] 发送前最后检查是否需要中断 ---
|
||||
if self._check_interrupt_before_sending(message_ids_before_planning):
|
||||
logger.info(f"[私聊][{self.private_name}]生成回复后、发送前发现过多新消息,取消发送,重新规划")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送前发现过多新消息,取消发送: {final_reply_to_send}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
return # 直接返回,主循环会重新规划
|
||||
|
||||
# 确认发送
|
||||
self.generated_reply = final_reply_to_send
|
||||
send_success = await self._send_reply()
|
||||
send_success = await self._send_reply() # 调用发送函数
|
||||
|
||||
if send_success:
|
||||
# 发送成功后,标记处理过的消息
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
|
||||
# --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 ---
|
||||
if new_user_messages_during_planning > 0:
|
||||
logger.info(f"[私聊][{self.private_name}] 发送追问成功后,检测到规划期间有 {new_user_messages_during_planning} 条用户新消息,强制重置回复状态以进行新规划。")
|
||||
self.conversation_info.last_successful_reply_action = None # 强制重新规划
|
||||
else:
|
||||
# 只有在规划期间没有用户新消息时,才设置追问状态
|
||||
logger.info(f"[私聊][{self.private_name}] 发送追问成功,规划期间无用户新消息,允许下次进入追问状态。")
|
||||
self.conversation_info.last_successful_reply_action = "send_new_message"
|
||||
# --- 核心逻辑修改结束 ---
|
||||
|
||||
action_successful = True
|
||||
else:
|
||||
logger.error(f"[私聊][{self.private_name}]发送追问回复失败")
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送追问回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
reply_sent = True # 标记回复已发送
|
||||
logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.")
|
||||
# 清空上次拒绝记录 (再次确保)
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
# --- [修改点 7] 发送成功后,处理新消息并决定下一轮 prompt 类型 ---
|
||||
# 获取发送后的最新未处理消息列表
|
||||
final_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', [])
|
||||
final_unprocessed_count = len(final_unprocessed_messages)
|
||||
|
||||
# 计算在生成和发送期间新增的消息数
|
||||
new_messages_during_generation_count = 0
|
||||
for msg in final_unprocessed_messages:
|
||||
msg_id = msg.get("message_id")
|
||||
# 如果消息 ID 不在规划前的集合中,说明是新来的
|
||||
if msg_id and msg_id not in message_ids_before_planning:
|
||||
new_messages_during_generation_count += 1
|
||||
|
||||
logger.debug(f"[私聊][{self.private_name}]回复发送后,当前未处理消息数: {final_unprocessed_count}, 其中生成/发送期间新增: {new_messages_during_generation_count}")
|
||||
|
||||
# 根据生成期间是否有新消息,决定下次规划用哪个 prompt
|
||||
if new_messages_during_generation_count > 0:
|
||||
# 有 1 条或更多新消息在生成期间到达
|
||||
logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_generation_count} 条在生成/发送期间到达的新消息,下一轮将使用首次回复逻辑处理。")
|
||||
self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY
|
||||
else:
|
||||
# 没有新消息在生成期间到达
|
||||
logger.info(f"[私聊][{self.private_name}]生成/发送期间无新消息,下一轮将根据 '{action}' 使用追问逻辑。")
|
||||
self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP
|
||||
|
||||
# --- [修改点 8] 清理规划时已知的消息 ---
|
||||
# 只有在回复成功发送后,才清理掉那些在规划时就已经看到的消息
|
||||
if message_ids_before_planning:
|
||||
await observation_info.clear_processed_messages(message_ids_before_planning)
|
||||
|
||||
|
||||
else: # 发送失败
|
||||
logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。")
|
||||
# 发送失败,也认为动作未成功,重置状态
|
||||
action_successful = False
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送回复时失败"}
|
||||
)
|
||||
|
||||
elif need_replan:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}"
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
# 检查后决定打回动作决策
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}"
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
else: # 多次尝试后仍然不合适 (is_suitable is False and not need_replan)
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}"
|
||||
)
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
|
||||
# 执行 Wait 操作
|
||||
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
|
||||
self.state = ConversationState.WAITING
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
await self.waiter.wait(self.conversation_info)
|
||||
wait_action_record = {
|
||||
"action": "wait",
|
||||
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
|
||||
"status": "done",
|
||||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
"final_reason": None,
|
||||
}
|
||||
conversation_info.done_action.append(wait_action_record)
|
||||
action_successful = True
|
||||
# 如果是 send_new_message 失败,则执行 wait (保持原 fallback 逻辑)
|
||||
if action == "send_new_message":
|
||||
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
|
||||
self.state = ConversationState.WAITING
|
||||
await self.waiter.wait(self.conversation_info)
|
||||
wait_action_record = {
|
||||
"action": "wait",
|
||||
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
|
||||
"status": "done", # wait 本身算完成
|
||||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
"final_reason": None,
|
||||
}
|
||||
conversation_info.done_action.append(wait_action_record)
|
||||
action_successful = True # fallback wait 成功
|
||||
# 注意: fallback wait 成功后,last_successful_reply_action 仍然是 None
|
||||
|
||||
elif action == "direct_reply":
|
||||
max_reply_attempts = 3
|
||||
reply_attempt_count = 0
|
||||
is_suitable = False
|
||||
need_replan = False
|
||||
check_reason = "未进行尝试"
|
||||
final_reply_to_send = ""
|
||||
|
||||
while reply_attempt_count < max_reply_attempts and not is_suitable:
|
||||
reply_attempt_count += 1
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
|
||||
)
|
||||
self.state = ConversationState.GENERATING
|
||||
|
||||
# 1. 生成回复
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
observation_info, conversation_info, action_type="direct_reply"
|
||||
)
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}"
|
||||
)
|
||||
|
||||
# 2. 检查回复
|
||||
self.state = ConversationState.CHECKING
|
||||
try:
|
||||
current_goal_str = ""
|
||||
if conversation_info.goal_list:
|
||||
first_goal = conversation_info.goal_list[0]
|
||||
if isinstance(first_goal, dict):
|
||||
current_goal_str = first_goal.get("goal", "")
|
||||
elif isinstance(first_goal, str):
|
||||
current_goal_str = first_goal
|
||||
|
||||
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
|
||||
reply=self.generated_reply,
|
||||
goal=current_goal_str,
|
||||
chat_history=observation_info.chat_history,
|
||||
chat_history_str=observation_info.chat_history_str,
|
||||
retry_count=reply_attempt_count - 1,
|
||||
)
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
|
||||
)
|
||||
|
||||
if not is_suitable:
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
else:
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', None)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', None)
|
||||
|
||||
if is_suitable:
|
||||
final_reply_to_send = self.generated_reply
|
||||
break
|
||||
elif need_replan:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}"
|
||||
)
|
||||
break
|
||||
except Exception as check_err:
|
||||
logger.error(
|
||||
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}"
|
||||
)
|
||||
check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}"
|
||||
setattr(conversation_info, 'last_reply_rejection_reason', check_reason)
|
||||
setattr(conversation_info, 'last_rejected_reply_content', self.generated_reply)
|
||||
break
|
||||
|
||||
# 循环结束,处理最终结果
|
||||
if is_suitable:
|
||||
# 发送合适的回复
|
||||
self.generated_reply = final_reply_to_send
|
||||
send_success = await self._send_reply()
|
||||
|
||||
if send_success:
|
||||
# 发送成功后,标记处理过的消息
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
|
||||
# --- 核心逻辑修改:根据规划期间收到的“用户”新消息数决定下一步状态 ---
|
||||
if new_user_messages_during_planning > 0:
|
||||
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功后,检测到规划期间有 {new_user_messages_during_planning} 条用户新消息,强制重置回复状态以进行新规划。")
|
||||
self.conversation_info.last_successful_reply_action = None # 强制重新规划
|
||||
else:
|
||||
# 只有在规划期间没有用户新消息时,才设置追问状态
|
||||
logger.info(f"[私聊][{self.private_name}] 发送首次回复成功,规划期间无用户新消息,允许下次进入追问状态。")
|
||||
self.conversation_info.last_successful_reply_action = "direct_reply"
|
||||
# --- 核心逻辑修改结束 ---
|
||||
|
||||
action_successful = True
|
||||
else:
|
||||
logger.error(f"[私聊][{self.private_name}]发送首次回复失败")
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"发送首次回复失败: {final_reply_to_send}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
elif need_replan:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}"
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}"
|
||||
)
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
# 执行 Wait 操作
|
||||
logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...")
|
||||
self.state = ConversationState.WAITING
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
await self.waiter.wait(self.conversation_info)
|
||||
wait_action_record = {
|
||||
"action": "wait",
|
||||
"plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待",
|
||||
"status": "done",
|
||||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
"final_reason": None,
|
||||
}
|
||||
conversation_info.done_action.append(wait_action_record)
|
||||
action_successful = True
|
||||
|
||||
# --- 其他动作的处理逻辑保持不变,但确保在成功后调用 mark_messages_processed_up_to ---
|
||||
# --- 处理其他动作 (保持大部分不变,主要是确保状态重置) ---
|
||||
elif action == "rethink_goal":
|
||||
self.state = ConversationState.RETHINKING
|
||||
try:
|
||||
|
|
@ -580,11 +551,14 @@ class Conversation:
|
|||
action_successful = True
|
||||
except Exception as rethink_err:
|
||||
logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}")
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
|
||||
)
|
||||
# 无论成功失败,非回复动作都重置 last_successful_reply_action
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.last_reply_rejection_reason = None # 清除拒绝原因
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
|
||||
elif action == "listening":
|
||||
self.state = ConversationState.LISTENING
|
||||
|
|
@ -598,57 +572,82 @@ class Conversation:
|
|||
action_successful = True
|
||||
except Exception as listen_err:
|
||||
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
|
||||
)
|
||||
# 无论成功失败,非回复动作都重置
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
elif action == "say_goodbye":
|
||||
self.state = ConversationState.GENERATING
|
||||
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
|
||||
try:
|
||||
# 1. 生成告别语
|
||||
self.generated_reply = await self.reply_generator.generate(
|
||||
observation_info, conversation_info, action_type="say_goodbye"
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
|
||||
|
||||
# 2. 发送告别语
|
||||
if self.generated_reply:
|
||||
# --- [修改点 9] 告别前也检查中断 ---
|
||||
if self._check_interrupt_before_sending(message_ids_before_planning):
|
||||
logger.info(f"[私聊][{self.private_name}]发送告别语前发现过多新消息,取消发送,重新规划")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语前发现过多新消息"}
|
||||
)
|
||||
self.should_continue = True # 不能结束,需要重规划
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
return
|
||||
|
||||
send_success = await self._send_reply()
|
||||
if send_success:
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
action_successful = True
|
||||
reply_sent = True # 标记发送成功
|
||||
logger.info(f"[私聊][{self.private_name}]告别语已发送。")
|
||||
# 发送告别语成功后,通常意味着对话结束
|
||||
self.should_continue = False
|
||||
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
|
||||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]发送告别语失败。")
|
||||
action_successful = False
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语失败"}
|
||||
)
|
||||
logger.warning(f"[私聊][{self.private_name}]发送告别语失败。")
|
||||
action_successful = False
|
||||
# 发送失败不应结束对话,可能需要重试或做其他事
|
||||
self.should_continue = True
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "发送告别语失败"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None # 重置状态
|
||||
|
||||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。")
|
||||
action_successful = False
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "未能生成告别语内容"}
|
||||
)
|
||||
self.should_continue = False
|
||||
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
|
||||
self.should_continue = True # 未能生成也不能结束
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": "未能生成告别语内容"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
except Exception as goodbye_err:
|
||||
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
|
||||
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
|
||||
self.should_continue = False
|
||||
action_successful = False
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
|
||||
)
|
||||
self.should_continue = True # 出错也不能结束
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
elif action == "end_conversation":
|
||||
self.should_continue = False
|
||||
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
action_successful = True
|
||||
# 结束对话也重置状态
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
|
||||
elif action == "block_and_ignore":
|
||||
logger.info(f"[私聊][{self.private_name}]不想再理你了...")
|
||||
|
|
@ -658,8 +657,12 @@ class Conversation:
|
|||
f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
|
||||
)
|
||||
self.state = ConversationState.IGNORED
|
||||
await observation_info.mark_messages_processed_up_to(planning_marker_time)
|
||||
action_successful = True
|
||||
# 忽略也重置状态
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
|
||||
else: # 对应 'wait' 动作
|
||||
self.state = ConversationState.WAITING
|
||||
|
|
@ -673,58 +676,67 @@ class Conversation:
|
|||
action_successful = True
|
||||
except Exception as wait_err:
|
||||
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
|
||||
if action_index < len(conversation_info.done_action):
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
|
||||
)
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.done_action[action_index].update(
|
||||
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
|
||||
)
|
||||
# 无论成功失败,非回复动作都重置
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
conversation_info.last_reply_rejection_reason = None
|
||||
conversation_info.last_rejected_reply_content = None
|
||||
|
||||
|
||||
# --- 更新 Action History 状态 ---
|
||||
if action_successful:
|
||||
if action_index < len(conversation_info.done_action):
|
||||
# 只有在明确不需要强制重新规划时,才在非回复动作后重置状态
|
||||
# 注意:这里的条件与回复动作后的逻辑略有不同,因为非回复动作本身就不会进入追问
|
||||
if action not in ["direct_reply", "send_new_message"]:
|
||||
self.conversation_info.last_successful_reply_action = None
|
||||
|
||||
conversation_info.done_action[action_index].update(
|
||||
{
|
||||
"status": "done",
|
||||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.error(f"[私聊][{self.private_name}]尝试更新无效的 action_index: {action_index},当前 done_action 长度: {len(conversation_info.done_action)}")
|
||||
conversation_info.done_action[action_index].update(
|
||||
{
|
||||
"status": "done",
|
||||
"time": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
}
|
||||
)
|
||||
# **注意**: last_successful_reply_action 的更新逻辑已经移到各自的动作处理中
|
||||
logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'")
|
||||
else:
|
||||
# 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action 的 final_reason
|
||||
logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败")
|
||||
|
||||
# --- [修改点 10] _send_reply 返回布尔值表示成功与否 ---
|
||||
async def _send_reply(self) -> bool:
|
||||
"""发送回复,并返回发送是否成功"""
|
||||
"""发送回复,并返回是否发送成功"""
|
||||
if not self.generated_reply:
|
||||
logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。")
|
||||
return False
|
||||
return False # 发送失败
|
||||
|
||||
try:
|
||||
reply_content = self.generated_reply
|
||||
|
||||
# 检查依赖项
|
||||
if not hasattr(self, "direct_sender") or not self.direct_sender:
|
||||
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
|
||||
return False
|
||||
return False # 发送失败
|
||||
if not self.chat_stream:
|
||||
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。")
|
||||
return False
|
||||
return False # 发送失败
|
||||
|
||||
# 发送消息
|
||||
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
|
||||
|
||||
self.state = ConversationState.ANALYZING
|
||||
return True
|
||||
# 发送成功后,可以考虑触发 observer 更新,但需谨慎避免竞争条件或重复处理
|
||||
# 暂时注释掉,依赖 observer 的自然更新周期
|
||||
# self.chat_observer.trigger_update()
|
||||
# await self.chat_observer.wait_for_update()
|
||||
|
||||
self.state = ConversationState.ANALYZING # 更新状态 (例如,可以改为 IDLE 或 WAITING)
|
||||
return True # 发送成功
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}")
|
||||
logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}")
|
||||
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
|
||||
self.state = ConversationState.ANALYZING
|
||||
return False
|
||||
self.state = ConversationState.ANALYZING # 或者设置为 ERROR 状态?
|
||||
return False # 发送失败
|
||||
|
||||
|
||||
async def _send_timeout_message(self):
|
||||
"""发送超时结束消息"""
|
||||
"""发送超时结束消息 (保持不变)"""
|
||||
try:
|
||||
if not hasattr(self, 'observation_info') or not self.observation_info.chat_history:
|
||||
logger.warning(f"[私聊][{self.private_name}]无法获取聊天历史,无法发送超时消息。")
|
||||
|
|
@ -744,5 +756,4 @@ class Conversation:
|
|||
chat_stream=self.chat_stream, content="[自动消息] 对方长时间未响应,对话已超时。", reply_to_message=latest_message
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")
|
||||
|
||||
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")
|
||||
|
|
@ -8,3 +8,5 @@ class ConversationInfo:
|
|||
self.knowledge_list = []
|
||||
self.memory_list = []
|
||||
self.last_successful_reply_action: Optional[str] = None
|
||||
self.last_reply_rejection_reason: Optional[str] = None # 用于存储上次回复被拒原因
|
||||
self.last_rejected_reply_content: Optional[str] = None # 用于存储上次被拒的回复内容
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# File: observation_info.py
|
||||
from typing import List, Optional, Dict, Any, Set
|
||||
from maim_message import UserInfo
|
||||
import time
|
||||
|
|
@ -102,10 +104,10 @@ class ObservationInfoHandler(NotificationHandler):
|
|||
self.observation_info.unprocessed_messages = [
|
||||
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
|
||||
]
|
||||
if len(self.observation_info.unprocessed_messages) < original_count:
|
||||
logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})")
|
||||
# 更新未处理消息计数
|
||||
self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages)
|
||||
# --- [修改点 11] 更新 new_messages_count ---
|
||||
self.observation_info.new_messages_count = len(self.observation_info.unprocessed_messages)
|
||||
if self.observation_info.new_messages_count < original_count:
|
||||
logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id}), 当前未处理数: {self.observation_info.new_messages_count}")
|
||||
|
||||
|
||||
elif notification_type == NotificationType.USER_JOINED:
|
||||
|
|
@ -178,7 +180,7 @@ class ObservationInfo:
|
|||
self.last_message_id: Optional[str] = None
|
||||
self.last_message_content: str = ""
|
||||
self.last_message_sender: Optional[str] = None
|
||||
self.bot_id: Optional[str] = None # Consider initializing from config
|
||||
self.bot_id: Optional[str] = None # 需要在某个地方设置 bot_id,例如从 global_config 获取
|
||||
self.chat_history_count: int = 0
|
||||
self.new_messages_count: int = 0
|
||||
self.cold_chat_start_time: Optional[float] = None
|
||||
|
|
@ -194,266 +196,196 @@ class ObservationInfo:
|
|||
|
||||
self.handler: ObservationInfoHandler = ObservationInfoHandler(self, self.private_name)
|
||||
|
||||
def bind_to_chat_observer(self, chat_observer: 'ChatObserver'): # Use forward reference
|
||||
"""绑定到指定的chat_observer
|
||||
# --- 初始化 bot_id ---
|
||||
from ...config.config import global_config # 移动到 __init__ 内部以避免循环导入问题
|
||||
self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
|
||||
|
||||
Args:
|
||||
chat_observer: 要绑定的 ChatObserver 实例
|
||||
"""
|
||||
def bind_to_chat_observer(self, chat_observer: ChatObserver):
|
||||
"""绑定到指定的chat_observer (保持不变)"""
|
||||
if self.chat_observer:
|
||||
logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver")
|
||||
return
|
||||
|
||||
self.chat_observer = chat_observer
|
||||
try:
|
||||
if not self.handler: # 确保 handler 已经被创建
|
||||
if not self.handler:
|
||||
logger.error(f"[私聊][{self.private_name}] 尝试绑定时 handler 未初始化!")
|
||||
self.chat_observer = None # 重置,防止后续错误
|
||||
self.chat_observer = None
|
||||
return
|
||||
|
||||
# 注册关心的通知类型
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
|
||||
)
|
||||
# --- 新增:注册其他必要的通知类型 ---
|
||||
# --- [修改点 12] 注册 MESSAGE_DELETED ---
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.ACTIVE_CHAT, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.BOT_SPEAKING, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.USER_SPEAKING, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.USER_JOINED, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.USER_LEFT, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.register_handler(
|
||||
target="observation_info", notification_type=NotificationType.ERROR, handler=self.handler
|
||||
)
|
||||
# --- 注册结束 ---
|
||||
|
||||
target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver")
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}")
|
||||
self.chat_observer = None # 绑定失败,重置
|
||||
self.chat_observer = None
|
||||
|
||||
def unbind_from_chat_observer(self):
|
||||
"""解除与chat_observer的绑定"""
|
||||
"""解除与chat_observer的绑定 (保持不变)"""
|
||||
if (
|
||||
self.chat_observer and hasattr(self.chat_observer, "notification_manager") and self.handler
|
||||
): # 增加 handler 检查
|
||||
):
|
||||
try:
|
||||
# --- 注销所有注册过的通知类型 ---
|
||||
notification_types_to_unregister = [
|
||||
NotificationType.NEW_MESSAGE,
|
||||
NotificationType.COLD_CHAT,
|
||||
NotificationType.ACTIVE_CHAT,
|
||||
NotificationType.BOT_SPEAKING,
|
||||
NotificationType.USER_SPEAKING,
|
||||
NotificationType.MESSAGE_DELETED,
|
||||
NotificationType.USER_JOINED,
|
||||
NotificationType.USER_LEFT,
|
||||
NotificationType.ERROR,
|
||||
]
|
||||
for nt in notification_types_to_unregister:
|
||||
self.chat_observer.notification_manager.unregister_handler(
|
||||
target="observation_info", notification_type=nt, handler=self.handler
|
||||
)
|
||||
# --- 注销结束 ---
|
||||
self.chat_observer.notification_manager.unregister_handler(
|
||||
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
|
||||
)
|
||||
self.chat_observer.notification_manager.unregister_handler(
|
||||
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
|
||||
)
|
||||
# --- [修改点 13] 注销 MESSAGE_DELETED ---
|
||||
self.chat_observer.notification_manager.unregister_handler(
|
||||
target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑")
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}")
|
||||
finally: # 确保 chat_observer 被重置
|
||||
finally:
|
||||
self.chat_observer = None
|
||||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在、无效或 handler 未设置")
|
||||
|
||||
# 修改:update_from_message 接收 UserInfo 对象
|
||||
async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]):
|
||||
"""从消息更新信息
|
||||
|
||||
Args:
|
||||
message: 消息数据字典
|
||||
user_info: 解析后的 UserInfo 对象 (可能为 None)
|
||||
"""
|
||||
"""从消息更新信息 (保持不变)"""
|
||||
message_time = message.get("time")
|
||||
message_id = message.get("message_id")
|
||||
processed_text = message.get("processed_plain_text", "")
|
||||
|
||||
# 检查消息是否已存在于未处理列表中 (避免重复添加)
|
||||
if any(msg.get("message_id") == message_id for msg in self.unprocessed_messages):
|
||||
# logger.debug(f"[私聊][{self.private_name}]消息 {message_id} 已存在于未处理列表,跳过")
|
||||
return
|
||||
|
||||
# 只有在新消息到达时才更新 last_message 相关信息
|
||||
if message_time and message_time > (self.last_message_time or 0):
|
||||
self.last_message_time = message_time
|
||||
self.last_message_id = message_id
|
||||
self.last_message_content = processed_text
|
||||
# 重置冷场计时器
|
||||
self.is_cold_chat = False # Corrected variable name
|
||||
self.is_cold_chat = False
|
||||
self.cold_chat_start_time = None
|
||||
self.cold_chat_duration = 0.0
|
||||
|
||||
if user_info:
|
||||
sender_id = str(user_info.user_id) # 确保是字符串
|
||||
sender_id = str(user_info.user_id)
|
||||
self.last_message_sender = sender_id
|
||||
# 更新发言时间
|
||||
# 假设 self.bot_id 已经正确初始化 (例如从 global_config)
|
||||
if self.bot_id and sender_id == str(self.bot_id):
|
||||
if sender_id == self.bot_id:
|
||||
self.last_bot_speak_time = message_time
|
||||
else:
|
||||
self.last_user_speak_time = message_time
|
||||
self.active_users.add(sender_id) # 用户发言则认为其活跃
|
||||
self.active_users.add(sender_id)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}"
|
||||
)
|
||||
self.last_message_sender = None # 发送者未知
|
||||
self.last_message_sender = None
|
||||
|
||||
# 将原始消息字典添加到未处理列表
|
||||
self.unprocessed_messages.append(message)
|
||||
self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度
|
||||
# --- [修改点 14] 添加到未处理列表,并更新计数 ---
|
||||
# 检查消息是否已存在于未处理列表中,避免重复添加
|
||||
if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages):
|
||||
self.unprocessed_messages.append(message)
|
||||
self.new_messages_count = len(self.unprocessed_messages)
|
||||
logger.debug(f"[私聊][{self.private_name}]添加新未处理消息 ID: {message_id}, 当前未处理数: {self.new_messages_count}")
|
||||
self.update_changed()
|
||||
else:
|
||||
logger.warning(f"[私聊][{self.private_name}]尝试重复添加未处理消息 ID: {message_id}")
|
||||
|
||||
# logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}")
|
||||
self.update_changed() # 标记状态已改变
|
||||
else:
|
||||
# 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告
|
||||
# logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}")
|
||||
# 即使时间戳旧,也可能需要加入未处理列表(如果它是之前漏掉的)
|
||||
# 但为了避免复杂化,暂时按原逻辑处理:只处理时间更新的消息
|
||||
pass
|
||||
|
||||
|
||||
def update_changed(self):
|
||||
"""标记状态已改变,并重置标记"""
|
||||
# logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)")
|
||||
"""标记状态已改变,并重置标记 (保持不变)"""
|
||||
self.changed = True
|
||||
|
||||
async def update_cold_chat_status(self, is_cold: bool, current_time: float):
|
||||
"""更新冷场状态
|
||||
|
||||
Args:
|
||||
is_cold: 是否处于冷场状态
|
||||
current_time: 当前时间戳
|
||||
"""
|
||||
if is_cold != self.is_cold_chat: # 仅在状态变化时更新 # Corrected variable name
|
||||
self.is_cold_chat = is_cold # Corrected variable name
|
||||
"""更新冷场状态 (保持不变)"""
|
||||
if is_cold != self.is_cold_chat:
|
||||
self.is_cold_chat = is_cold
|
||||
if is_cold:
|
||||
# 进入冷场状态
|
||||
self.cold_chat_start_time = (
|
||||
self.last_message_time or current_time
|
||||
) # 从最后消息时间开始算,或从当前时间开始
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}")
|
||||
else:
|
||||
# 结束冷场状态
|
||||
if self.cold_chat_start_time:
|
||||
self.cold_chat_duration = current_time - self.cold_chat_start_time
|
||||
logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f} 秒")
|
||||
self.cold_chat_start_time = None # 重置开始时间
|
||||
self.update_changed() # 状态变化,标记改变
|
||||
self.cold_chat_start_time = None
|
||||
self.update_changed()
|
||||
|
||||
# 即使状态没变,如果是冷场状态,也更新持续时间
|
||||
if self.is_cold_chat and self.cold_chat_start_time: # Corrected variable name
|
||||
if self.is_cold_chat and self.cold_chat_start_time:
|
||||
self.cold_chat_duration = current_time - self.cold_chat_start_time
|
||||
|
||||
def get_active_duration(self) -> float:
|
||||
"""获取当前活跃时长 (距离最后一条消息的时间)
|
||||
|
||||
Returns:
|
||||
float: 最后一条消息到现在的时长(秒)
|
||||
"""
|
||||
"""获取当前活跃时长 (保持不变)"""
|
||||
if not self.last_message_time:
|
||||
return 0.0
|
||||
return time.time() - self.last_message_time
|
||||
|
||||
def get_user_response_time(self) -> Optional[float]:
|
||||
"""获取用户最后响应时间 (距离用户最后发言的时间)
|
||||
|
||||
Returns:
|
||||
Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None
|
||||
"""
|
||||
"""获取用户最后响应时间 (保持不变)"""
|
||||
if not self.last_user_speak_time:
|
||||
return None
|
||||
return time.time() - self.last_user_speak_time
|
||||
|
||||
def get_bot_response_time(self) -> Optional[float]:
|
||||
"""获取机器人最后响应时间 (距离机器人最后发言的时间)
|
||||
|
||||
Returns:
|
||||
Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None
|
||||
"""
|
||||
"""获取机器人最后响应时间 (保持不变)"""
|
||||
if not self.last_bot_speak_time:
|
||||
return None
|
||||
return time.time() - self.last_bot_speak_time
|
||||
|
||||
# --- 新增方法 ---
|
||||
async def mark_messages_processed_up_to(self, marker_timestamp: float):
|
||||
"""
|
||||
将指定时间戳之前(包括等于)的未处理消息移入历史记录。
|
||||
|
||||
Args:
|
||||
marker_timestamp: 时间戳标记。
|
||||
"""
|
||||
messages_to_process = [
|
||||
msg for msg in self.unprocessed_messages if msg.get("time", 0) <= marker_timestamp
|
||||
]
|
||||
|
||||
if not messages_to_process:
|
||||
# logger.debug(f"[私聊][{self.private_name}]没有在 {marker_timestamp} 之前的未处理消息。")
|
||||
# --- [修改点 15] 重命名并修改 clear_unprocessed_messages ---
|
||||
# async def clear_unprocessed_messages(self): <-- 旧方法注释掉或删除
|
||||
async def clear_processed_messages(self, message_ids_to_clear: Set[str]):
|
||||
"""将指定ID的未处理消息移入历史记录,并更新相关状态"""
|
||||
if not message_ids_to_clear:
|
||||
logger.debug(f"[私聊][{self.private_name}]没有需要清理的消息 ID。")
|
||||
return
|
||||
|
||||
# logger.debug(f"[私聊][{self.private_name}]处理 {len(messages_to_process)} 条直到 {marker_timestamp} 的未处理消息...")
|
||||
messages_to_move = []
|
||||
remaining_messages = []
|
||||
cleared_count = 0
|
||||
|
||||
# 将要处理的消息添加到历史记录
|
||||
max_history_len = 100 # 示例:最多保留100条历史记录
|
||||
self.chat_history.extend(messages_to_process)
|
||||
# 分离要清理和要保留的消息
|
||||
for msg in self.unprocessed_messages:
|
||||
if msg.get("message_id") in message_ids_to_clear:
|
||||
messages_to_move.append(msg)
|
||||
cleared_count += 1
|
||||
else:
|
||||
remaining_messages.append(msg)
|
||||
|
||||
if not messages_to_move:
|
||||
logger.debug(f"[私聊][{self.private_name}]未找到与 ID 列表匹配的未处理消息进行清理。")
|
||||
return
|
||||
|
||||
logger.debug(f"[私聊][{self.private_name}]准备清理 {cleared_count} 条已处理消息...")
|
||||
|
||||
# 将要移动的消息添加到历史记录
|
||||
max_history_len = 100
|
||||
self.chat_history.extend(messages_to_move)
|
||||
if len(self.chat_history) > max_history_len:
|
||||
self.chat_history = self.chat_history[-max_history_len:]
|
||||
|
||||
# 更新历史记录字符串 (只使用最近一部分生成,例如20条)
|
||||
history_slice_for_str = self.chat_history[-20:]
|
||||
# 更新历史记录字符串 (仅使用最近一部分生成)
|
||||
history_slice_for_str = self.chat_history[-20:] # 例如最近20条
|
||||
try:
|
||||
self.chat_history_str = await build_readable_messages(
|
||||
history_slice_for_str,
|
||||
replace_bot_name=True,
|
||||
merge_messages=False,
|
||||
timestamp_mode="relative",
|
||||
read_mark=0.0, # read_mark 可能需要根据逻辑调整
|
||||
read_mark=0.0,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}")
|
||||
self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示
|
||||
self.chat_history_str = "[构建聊天记录出错]"
|
||||
|
||||
# 从未处理列表中移除已处理的消息
|
||||
processed_ids = {msg.get("message_id") for msg in messages_to_process}
|
||||
self.unprocessed_messages = [
|
||||
msg for msg in self.unprocessed_messages if msg.get("message_id") not in processed_ids
|
||||
]
|
||||
|
||||
# 更新未处理消息计数和历史记录总数
|
||||
# 更新未处理消息列表和计数
|
||||
self.unprocessed_messages = remaining_messages
|
||||
self.new_messages_count = len(self.unprocessed_messages)
|
||||
self.chat_history_count = len(self.chat_history)
|
||||
# logger.debug(f"[私聊][{self.private_name}]已处理 {len(messages_to_process)} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。")
|
||||
|
||||
self.update_changed() # 状态改变
|
||||
|
||||
# --- 移除或注释掉旧的 clear_unprocessed_messages 方法 ---
|
||||
# async def clear_unprocessed_messages(self):
|
||||
# """将未处理消息移入历史记录,并更新相关状态 (此方法将被 mark_messages_processed_up_to 替代)"""
|
||||
# # ... (旧代码) ...
|
||||
# logger.warning(f"[私聊][{self.private_name}] 调用了已弃用的 clear_unprocessed_messages 方法。请使用 mark_messages_processed_up_to。")
|
||||
# # 为了兼容性,可以暂时调用新方法处理所有消息,但不推荐
|
||||
# # await self.mark_messages_processed_up_to(time.time())
|
||||
# pass # 或者直接留空
|
||||
logger.info(f"[私聊][{self.private_name}]已清理 {cleared_count} 条消息,剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。")
|
||||
|
||||
self.update_changed() # 状态改变
|
||||
|
|
@ -2,9 +2,9 @@ import traceback
|
|||
import json
|
||||
import re
|
||||
from typing import Dict, Any, Optional, Tuple, List, Union
|
||||
from src.common.logger_manager import get_logger # 确认 logger 的导入路径
|
||||
from src.common.logger_manager import get_logger # 确认 logger 的导入路径
|
||||
from src.plugins.memory_system.Hippocampus import HippocampusManager
|
||||
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径
|
||||
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径
|
||||
|
||||
logger = get_logger("pfc_utils")
|
||||
|
||||
|
|
@ -47,33 +47,36 @@ async def retrieve_contextual_info(text: str, private_name: str) -> Tuple[str, s
|
|||
retrieved_memory_str = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n"
|
||||
memory_log_msg = f"自动检索到记忆: {related_memory_info.strip()[:100]}..."
|
||||
else:
|
||||
memory_log_msg = "自动检索记忆返回为空。"
|
||||
memory_log_msg = "自动检索记忆返回为空。"
|
||||
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 记忆检索: {memory_log_msg}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}")
|
||||
logger.error(
|
||||
f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
retrieved_memory_str = "检索记忆时出错。\n"
|
||||
|
||||
# 2. 检索知识 (逻辑来自原 action_planner 和 reply_generator)
|
||||
try:
|
||||
# 使用导入的 prompt_builder 实例及其方法
|
||||
knowledge_result = await prompt_builder.get_prompt_info(
|
||||
message=text, threshold=0.38 # threshold 可以根据需要调整
|
||||
message=text,
|
||||
threshold=0.38, # threshold 可以根据需要调整
|
||||
)
|
||||
if knowledge_result:
|
||||
retrieved_knowledge_str = knowledge_result # 直接使用返回结果
|
||||
knowledge_log_msg = "自动检索到相关知识。"
|
||||
retrieved_knowledge_str = knowledge_result # 直接使用返回结果
|
||||
knowledge_log_msg = "自动检索到相关知识。"
|
||||
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}")
|
||||
logger.error(
|
||||
f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
retrieved_knowledge_str = "检索知识时出错。\n"
|
||||
|
||||
return retrieved_memory_str, retrieved_knowledge_str
|
||||
|
||||
|
||||
|
||||
|
||||
def get_items_from_json(
|
||||
content: str,
|
||||
private_name: str,
|
||||
|
|
|
|||
|
|
@ -1,8 +1,11 @@
|
|||
from .pfc_utils import retrieve_contextual_info
|
||||
|
||||
# 可能用于旧知识库提取主题 (如果需要回退到旧方法)
|
||||
# import jieba # 如果报错说找不到 jieba,可能需要安装: pip install jieba
|
||||
# import re # 正则表达式库,通常 Python 自带
|
||||
from typing import Tuple, List, Dict, Any
|
||||
|
||||
# from src.common.logger import get_module_logger
|
||||
from src.common.logger_manager import get_logger
|
||||
from ..models.utils_model import LLMRequest
|
||||
from ...config.config import global_config
|
||||
|
|
@ -113,7 +116,6 @@ class ReplyGenerator:
|
|||
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
|
||||
self.reply_checker = ReplyChecker(stream_id, private_name)
|
||||
|
||||
|
||||
# 修改 generate 方法签名,增加 action_type 参数
|
||||
async def generate(
|
||||
self, observation_info: ObservationInfo, conversation_info: ConversationInfo, action_type: str
|
||||
|
|
@ -170,29 +172,35 @@ class ReplyGenerator:
|
|||
|
||||
# 构建 Persona 文本 (persona_text)
|
||||
persona_text = f"你的名字是{self.name},{self.personality_info}。"
|
||||
retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text
|
||||
retrieval_context = chat_history_text # 使用前面构建好的 chat_history_text
|
||||
# 调用共享函数进行检索
|
||||
retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(retrieval_context, self.private_name)
|
||||
logger.info(f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}")
|
||||
|
||||
retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(
|
||||
retrieval_context, self.private_name
|
||||
)
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'有' if '回忆起' in retrieved_memory_str else '无'} / 知识: {'有' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else '无'}"
|
||||
)
|
||||
|
||||
# --- 修改:构建上次回复失败原因和内容提示 ---
|
||||
last_rejection_info_str = ""
|
||||
# 检查 conversation_info 是否有上次拒绝的原因和内容,并且它们都不是 None
|
||||
last_reason = getattr(conversation_info, 'last_reply_rejection_reason', None)
|
||||
last_content = getattr(conversation_info, 'last_rejected_reply_content', None)
|
||||
last_reason = getattr(conversation_info, "last_reply_rejection_reason", None)
|
||||
last_content = getattr(conversation_info, "last_rejected_reply_content", None)
|
||||
|
||||
if last_reason and last_content:
|
||||
last_rejection_info_str = (
|
||||
f"\n------\n"
|
||||
f"【重要提示:你上一次尝试回复时失败了,以下是详细信息】\n"
|
||||
f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容
|
||||
f"上次试图发送的消息内容: “{last_content}”\n" # <-- 显示上次内容
|
||||
f"失败原因: “{last_reason}”\n"
|
||||
f"请根据【消息内容】和【失败原因】调整你的新回复,避免重复之前的错误。\n"
|
||||
f"------\n"
|
||||
)
|
||||
logger.info(f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n"
|
||||
f" 内容: {last_content}\n"
|
||||
f" 原因: {last_reason}")
|
||||
logger.info(
|
||||
f"[私聊][{self.private_name}]检测到上次回复失败信息,将加入 Prompt:\n"
|
||||
f" 内容: {last_content}\n"
|
||||
f" 原因: {last_reason}"
|
||||
)
|
||||
|
||||
# --- 选择 Prompt ---
|
||||
if action_type == "send_new_message":
|
||||
|
|
@ -206,22 +214,24 @@ class ReplyGenerator:
|
|||
logger.info(f"[私聊][{self.private_name}]使用 PROMPT_DIRECT_REPLY (首次/非连续回复生成)")
|
||||
|
||||
# --- 格式化最终的 Prompt ---
|
||||
try: # <--- 增加 try-except 块处理可能的 format 错误
|
||||
try: # <--- 增加 try-except 块处理可能的 format 错误
|
||||
prompt = prompt_template.format(
|
||||
persona_text=persona_text,
|
||||
goals_str=goals_str,
|
||||
chat_history_text=chat_history_text,
|
||||
retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。",
|
||||
retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。",
|
||||
last_rejection_info=last_rejection_info_str # <--- 新增传递上次拒绝原因
|
||||
last_rejection_info=last_rejection_info_str, # <--- 新增传递上次拒绝原因
|
||||
)
|
||||
except KeyError as e:
|
||||
logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。")
|
||||
# 返回错误信息或默认回复
|
||||
return "抱歉,准备回复时出了点问题,请检查一下我的代码..."
|
||||
logger.error(
|
||||
f"[私聊][{self.private_name}]格式化 Prompt 时出错,缺少键: {e}。请检查 Prompt 模板和传递的参数。"
|
||||
)
|
||||
# 返回错误信息或默认回复
|
||||
return "抱歉,准备回复时出了点问题,请检查一下我的代码..."
|
||||
except Exception as fmt_err:
|
||||
logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}")
|
||||
return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..."
|
||||
logger.error(f"[私聊][{self.private_name}]格式化 Prompt 时发生未知错误: {fmt_err}")
|
||||
return "抱歉,准备回复时出了点内部错误,请检查一下我的代码..."
|
||||
|
||||
# --- 调用 LLM 生成 ---
|
||||
logger.debug(f"[私聊][{self.private_name}]发送到LLM的生成提示词:\n------\n{prompt}\n------")
|
||||
|
|
|
|||
Loading…
Reference in New Issue