🤖 自动格式化代码 [skip ci]

pull/937/head
github-actions[bot] 2025-05-06 06:36:27 +00:00
parent e4857bb898
commit 06aa821eaa
7 changed files with 372 additions and 289 deletions

View File

@ -668,8 +668,12 @@ class BotConfig:
def idle_conversation(parent: dict): def idle_conversation(parent: dict):
idle_conversation_config = parent["idle_conversation"] idle_conversation_config = parent["idle_conversation"]
if config.INNER_VERSION in SpecifierSet(">=1.6.2"): if config.INNER_VERSION in SpecifierSet(">=1.6.2"):
config.enable_idle_conversation = idle_conversation_config.get("enable_idle_conversation", config.enable_idle_conversation) config.enable_idle_conversation = idle_conversation_config.get(
config.idle_check_interval = idle_conversation_config.get("idle_check_interval", config.idle_check_interval) "enable_idle_conversation", config.enable_idle_conversation
)
config.idle_check_interval = idle_conversation_config.get(
"idle_check_interval", config.idle_check_interval
)
config.min_idle_time = idle_conversation_config.get("min_idle_time", config.min_idle_time) config.min_idle_time = idle_conversation_config.get("min_idle_time", config.min_idle_time)
config.max_idle_time = idle_conversation_config.get("max_idle_time", config.max_idle_time) config.max_idle_time = idle_conversation_config.get("max_idle_time", config.max_idle_time)

View File

@ -138,7 +138,6 @@ class ActionPlanner:
# 获取 ChatObserver 实例 (单例模式) # 获取 ChatObserver 实例 (单例模式)
self.chat_observer = ChatObserver.get_instance(stream_id, private_name) self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
async def plan( async def plan(
self, self,
observation_info: ObservationInfo, observation_info: ObservationInfo,
@ -242,7 +241,7 @@ class ActionPlanner:
except Exception as end_dec_err: except Exception as end_dec_err:
logger.error(f"[私聊][{self.private_name}] 处理结束对话决策时出错: {end_dec_err}") logger.error(f"[私聊][{self.private_name}] 处理结束对话决策时出错: {end_dec_err}")
logger.warning(f"[私聊][{self.private_name}] 结束决策出错,将按原计划执行 end_conversation") logger.warning(f"[私聊][{self.private_name}] 结束决策出错,将按原计划执行 end_conversation")
final_action = "end_conversation" # 保持原计划 final_action = "end_conversation" # 保持原计划
final_reason = initial_reason final_reason = initial_reason
# --- [移除] 不再需要在这里检查 wait 动作的约束 --- # --- [移除] 不再需要在这里检查 wait 动作的约束 ---
@ -253,25 +252,30 @@ class ActionPlanner:
# --- 5. 验证最终行动类型 --- # --- 5. 验证最终行动类型 ---
valid_actions = [ valid_actions = [
"direct_reply", "send_new_message", "wait", "listening", "direct_reply",
"rethink_goal", "end_conversation", "block_and_ignore", "say_goodbye" "send_new_message",
"wait",
"listening",
"rethink_goal",
"end_conversation",
"block_and_ignore",
"say_goodbye",
] ]
if final_action not in valid_actions: if final_action not in valid_actions:
logger.warning(f"[私聊][{self.private_name}] LLM 返回了未知的行动类型: '{final_action}',强制改为 wait") logger.warning(f"[私聊][{self.private_name}] LLM 返回了未知的行动类型: '{final_action}',强制改为 wait")
final_reason = f"(原始行动'{final_action}'无效已强制改为wait) {final_reason}" final_reason = f"(原始行动'{final_action}'无效已强制改为wait) {final_reason}"
final_action = "wait" # 遇到无效动作,默认等待 final_action = "wait" # 遇到无效动作,默认等待
plan_duration = time.time() - plan_start_time plan_duration = time.time() - plan_start_time
logger.success(f"[私聊][{self.private_name}] 最终规划行动: {final_action} (总耗时: {plan_duration:.3f} 秒)") logger.success(f"[私聊][{self.private_name}] 最终规划行动: {final_action} (总耗时: {plan_duration:.3f} 秒)")
logger.info(f"[私聊][{self.private_name}] 行动原因: {final_reason}") logger.info(f"[私聊][{self.private_name}] 行动原因: {final_reason}")
return final_action, final_reason return final_action, final_reason
# --- Helper methods for preparing prompt inputs --- # --- Helper methods for preparing prompt inputs ---
def _get_bot_last_speak_time_info(self, observation_info: ObservationInfo) -> str: def _get_bot_last_speak_time_info(self, observation_info: ObservationInfo) -> str:
"""获取机器人上次发言时间提示""" """获取机器人上次发言时间提示"""
time_info = "" time_info = ""
try: try:
if not observation_info or not observation_info.bot_id: if not observation_info or not observation_info.bot_id:
@ -297,7 +301,7 @@ class ActionPlanner:
def _get_timeout_context(self, conversation_info: ConversationInfo) -> str: def _get_timeout_context(self, conversation_info: ConversationInfo) -> str:
"""获取超时提示信息""" """获取超时提示信息"""
timeout_context = "" timeout_context = ""
try: try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
@ -307,8 +311,12 @@ class ActionPlanner:
last_goal_text = last_goal_item.get("goal", "") last_goal_text = last_goal_item.get("goal", "")
elif isinstance(last_goal_item, str): elif isinstance(last_goal_item, str):
last_goal_text = last_goal_item last_goal_text = last_goal_item
if isinstance(last_goal_text, str) and "分钟," in last_goal_text and "思考接下来要做什么" in last_goal_text: if (
wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了","").strip() isinstance(last_goal_text, str)
and "分钟," in last_goal_text
and "思考接下来要做什么" in last_goal_text
):
wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了", "").strip()
timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n" timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n"
logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}") logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}")
except AttributeError as e: except AttributeError as e:
@ -319,7 +327,7 @@ class ActionPlanner:
def _build_goals_string(self, conversation_info: ConversationInfo) -> str: def _build_goals_string(self, conversation_info: ConversationInfo) -> str:
"""构建对话目标字符串""" """构建对话目标字符串"""
goals_str = "" goals_str = ""
try: try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list: if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
@ -349,25 +357,37 @@ class ActionPlanner:
async def _build_chat_history_text(self, observation_info: ObservationInfo) -> str: async def _build_chat_history_text(self, observation_info: ObservationInfo) -> str:
"""构建聊天历史记录文本 (包含未处理消息)""" """构建聊天历史记录文本 (包含未处理消息)"""
chat_history_text = "" chat_history_text = ""
try: try:
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str:
chat_history_text = observation_info.chat_history_str chat_history_text = observation_info.chat_history_str
elif hasattr(observation_info, "chat_history") and observation_info.chat_history: elif hasattr(observation_info, "chat_history") and observation_info.chat_history:
history_slice = observation_info.chat_history[-20:] history_slice = observation_info.chat_history[-20:]
chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) chat_history_text = await build_readable_messages(
history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0
)
else: else:
chat_history_text = "还没有聊天记录。\n" chat_history_text = "还没有聊天记录。\n"
unread_count = getattr(observation_info, 'new_messages_count', 0) unread_count = getattr(observation_info, "new_messages_count", 0)
unread_messages = getattr(observation_info, 'unprocessed_messages', []) unread_messages = getattr(observation_info, "unprocessed_messages", [])
if unread_count > 0 and unread_messages: if unread_count > 0 and unread_messages:
bot_qq_str = str(global_config.BOT_QQ) bot_qq_str = str(global_config.BOT_QQ)
other_unread_messages = [msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str] other_unread_messages = [
msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str
]
other_unread_count = len(other_unread_messages) other_unread_count = len(other_unread_messages)
if other_unread_count > 0: if other_unread_count > 0:
new_messages_str = await build_readable_messages(other_unread_messages, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0) new_messages_str = await build_readable_messages(
chat_history_text += f"\n--- 以下是 {other_unread_count} 条你需要处理的新消息 ---\n{new_messages_str}\n------\n" other_unread_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += (
f"\n--- 以下是 {other_unread_count} 条你需要处理的新消息 ---\n{new_messages_str}\n------\n"
)
logger.debug(f"[私聊][{self.private_name}] 向 LLM 追加了 {other_unread_count} 条未读消息。") logger.debug(f"[私聊][{self.private_name}] 向 LLM 追加了 {other_unread_count} 条未读消息。")
except AttributeError as e: except AttributeError as e:
logger.warning(f"[私聊][{self.private_name}] 构建聊天记录文本时属性错误: {e}") logger.warning(f"[私聊][{self.private_name}] 构建聊天记录文本时属性错误: {e}")
@ -377,10 +397,9 @@ class ActionPlanner:
chat_history_text = "[处理聊天记录时出错]\n" chat_history_text = "[处理聊天记录时出错]\n"
return chat_history_text return chat_history_text
def _build_action_history_context(self, conversation_info: ConversationInfo) -> Tuple[str, str]: def _build_action_history_context(self, conversation_info: ConversationInfo) -> Tuple[str, str]:
"""构建行动历史概要和上一次行动详细情况""" """构建行动历史概要和上一次行动详细情况"""
action_history_summary = "你最近执行的行动历史:\n" action_history_summary = "你最近执行的行动历史:\n"
last_action_context = "关于你【上一次尝试】的行动:\n" last_action_context = "关于你【上一次尝试】的行动:\n"
action_history_list: List[Dict[str, Any]] = [] action_history_list: List[Dict[str, Any]] = []
@ -437,7 +456,14 @@ class ActionPlanner:
end_content, _ = await self.llm.generate_response_async(end_decision_prompt) end_content, _ = await self.llm.generate_response_async(end_decision_prompt)
llm_duration = time.time() - llm_start_time llm_duration = time.time() - llm_start_time
logger.debug(f"[私聊][{self.private_name}] LLM (结束决策) 耗时: {llm_duration:.3f} 秒, 原始返回: {end_content}") logger.debug(f"[私聊][{self.private_name}] LLM (结束决策) 耗时: {llm_duration:.3f} 秒, 原始返回: {end_content}")
end_success, end_result = get_items_from_json(end_content, self.private_name, "say_bye", "reason", default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误默认不告别"}, required_types={"say_bye": str, "reason": str}) end_success, end_result = get_items_from_json(
end_content,
self.private_name,
"say_bye",
"reason",
default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误默认不告别"},
required_types={"say_bye": str, "reason": str},
)
say_bye_decision = end_result.get("say_bye", "no").lower() say_bye_decision = end_result.get("say_bye", "no").lower()
end_decision_reason = end_result.get("reason", "未提供原因") end_decision_reason = end_result.get("reason", "未提供原因")
if end_success and say_bye_decision == "yes": if end_success and say_bye_decision == "yes":

View File

@ -26,11 +26,13 @@ from .reply_checker import ReplyChecker
# 导入富文本回溯,用于更好的错误展示 # 导入富文本回溯,用于更好的错误展示
from rich.traceback import install from rich.traceback import install
install(extra_lines=3) install(extra_lines=3)
# 获取当前模块的日志记录器 # 获取当前模块的日志记录器
logger = get_logger("pfc_conversation") logger = get_logger("pfc_conversation")
class Conversation: class Conversation:
""" """
对话类负责管理单个私聊对话的状态和核心逻辑流程 对话类负责管理单个私聊对话的状态和核心逻辑流程
@ -47,11 +49,11 @@ class Conversation:
""" """
self.stream_id: str = stream_id self.stream_id: str = stream_id
self.private_name: str = private_name self.private_name: str = private_name
self.state: ConversationState = ConversationState.INIT # 对话的初始状态 self.state: ConversationState = ConversationState.INIT # 对话的初始状态
self.should_continue: bool = False # 标记对话循环是否应该继续运行 self.should_continue: bool = False # 标记对话循环是否应该继续运行
self.ignore_until_timestamp: Optional[float] = None # 如果设置了,忽略此时间戳之前的活动 self.ignore_until_timestamp: Optional[float] = None # 如果设置了,忽略此时间戳之前的活动
self.generated_reply: str = "" # 存储最近生成的回复内容 self.generated_reply: str = "" # 存储最近生成的回复内容
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流对象 self.chat_stream: Optional[ChatStream] = None # 关联的聊天流对象
# 初始化所有核心组件为 None将在 _initialize 中创建 # 初始化所有核心组件为 None将在 _initialize 中创建
self.action_planner: Optional[ActionPlanner] = None self.action_planner: Optional[ActionPlanner] = None
@ -64,11 +66,11 @@ class Conversation:
self.chat_observer: Optional[ChatObserver] = None self.chat_observer: Optional[ChatObserver] = None
self.observation_info: Optional[ObservationInfo] = None self.observation_info: Optional[ObservationInfo] = None
self.conversation_info: Optional[ConversationInfo] = None self.conversation_info: Optional[ConversationInfo] = None
self.reply_checker: Optional[ReplyChecker] = None # 回复检查器 self.reply_checker: Optional[ReplyChecker] = None # 回复检查器
# 内部状态标志 # 内部状态标志
self._initializing: bool = False # 标记是否正在初始化,防止并发问题 self._initializing: bool = False # 标记是否正在初始化,防止并发问题
self._initialized: bool = False # 标记是否已成功初始化 self._initialized: bool = False # 标记是否已成功初始化
# 缓存机器人自己的 QQ 号字符串,避免重复转换 # 缓存机器人自己的 QQ 号字符串,避免重复转换
self.bot_qq_str: Optional[str] = str(global_config.BOT_QQ) if global_config.BOT_QQ else None self.bot_qq_str: Optional[str] = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
@ -87,7 +89,7 @@ class Conversation:
logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。") logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。")
return return
self._initializing = True # 标记开始初始化 self._initializing = True # 标记开始初始化
logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}") logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}")
try: try:
@ -112,7 +114,9 @@ class Conversation:
self.chat_stream = chat_manager.get_stream(self.stream_id) self.chat_stream = chat_manager.get_stream(self.stream_id)
if not self.chat_stream: if not self.chat_stream:
# 获取不到 ChatStream 是一个严重问题,因为无法发送消息 # 获取不到 ChatStream 是一个严重问题,因为无法发送消息
logger.error(f"[私聊][{self.private_name}] 初始化错误:无法从 chat_manager 获取 stream_id {self.stream_id} 的 ChatStream。") logger.error(
f"[私聊][{self.private_name}] 初始化错误:无法从 chat_manager 获取 stream_id {self.stream_id} 的 ChatStream。"
)
raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream") raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream")
# 初始化空闲对话启动器 # 初始化空闲对话启动器
@ -148,8 +152,8 @@ class Conversation:
# 6. 标记初始化成功并设置运行状态 # 6. 标记初始化成功并设置运行状态
self._initialized = True self._initialized = True
self.should_continue = True # 初始化成功,标记可以继续运行循环 self.should_continue = True # 初始化成功,标记可以继续运行循环
self.state = ConversationState.ANALYZING # 设置初始状态为分析 self.state = ConversationState.ANALYZING # 设置初始状态为分析
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。") logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。")
@ -157,11 +161,11 @@ class Conversation:
# 捕获初始化过程中的任何异常 # 捕获初始化过程中的任何异常
logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}") logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
self.should_continue = False # 初始化失败,标记不能继续 self.should_continue = False # 初始化失败,标记不能继续
self._initialized = False # 确保标记为未初始化 self._initialized = False # 确保标记为未初始化
# 尝试停止可能部分启动的组件 # 尝试停止可能部分启动的组件
await self.stop() await self.stop()
raise # 将异常重新抛出,通知调用者初始化失败 raise # 将异常重新抛出,通知调用者初始化失败
finally: finally:
# 无论成功与否,都要清除正在初始化的标记 # 无论成功与否,都要清除正在初始化的标记
self._initializing = False self._initializing = False
@ -178,7 +182,7 @@ class Conversation:
initial_messages = get_raw_msg_before_timestamp_with_chat( initial_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id, chat_id=self.stream_id,
timestamp=time.time(), timestamp=time.time(),
limit=30, # limit 可以根据需要调整或配置 limit=30, # limit 可以根据需要调整或配置
) )
if initial_messages: if initial_messages:
@ -197,7 +201,9 @@ class Conversation:
try: try:
last_user_info = UserInfo.from_dict(last_user_info_dict) last_user_info = UserInfo.from_dict(last_user_info_dict)
# 存储发送者的 user_id 字符串 # 存储发送者的 user_id 字符串
self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None self.observation_info.last_message_sender = (
str(last_user_info.user_id) if last_user_info else None
)
except Exception as e: except Exception as e:
logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}") logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}")
self.observation_info.last_message_sender = None self.observation_info.last_message_sender = None
@ -209,13 +215,13 @@ class Conversation:
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
# 构建用于 Prompt 的历史记录字符串 (只使用最近的一部分) # 构建用于 Prompt 的历史记录字符串 (只使用最近的一部分)
history_slice_for_str = initial_messages[-20:] # 可配置 history_slice_for_str = initial_messages[-20:] # 可配置
self.observation_info.chat_history_str = await build_readable_messages( self.observation_info.chat_history_str = await build_readable_messages(
history_slice_for_str, history_slice_for_str,
replace_bot_name=True, replace_bot_name=True,
merge_messages=False, merge_messages=False,
timestamp_mode="relative", timestamp_mode="relative",
read_mark=0.0 # read_mark 可能需要根据实际情况调整 read_mark=0.0, # read_mark 可能需要根据实际情况调整
) )
# 更新 ChatObserver 和 IdleStarter 的时间戳 # 更新 ChatObserver 和 IdleStarter 的时间戳
@ -224,13 +230,17 @@ class Conversation:
self.chat_observer.last_message_time = self.observation_info.last_message_time self.chat_observer.last_message_time = self.observation_info.last_message_time
if self.idle_conversation_starter and self.observation_info.last_message_time: if self.idle_conversation_starter and self.observation_info.last_message_time:
# 更新空闲计时器的起始时间 # 更新空闲计时器的起始时间
await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) await self.idle_conversation_starter.update_last_message_time(
self.observation_info.last_message_time
)
logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}") logger.info(
f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
)
else: else:
# 如果没有历史记录 # 如果没有历史记录
logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。") logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。")
self.observation_info.chat_history_str = "还没有聊天记录。" # 设置默认提示 self.observation_info.chat_history_str = "还没有聊天记录。" # 设置默认提示
except Exception as load_err: except Exception as load_err:
# 捕获加载过程中的异常 # 捕获加载过程中的异常
@ -253,14 +263,16 @@ class Conversation:
# 在尝试初始化后,再次检查状态 # 在尝试初始化后,再次检查状态
if not self._initialized: if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。") logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。")
return # 初始化失败,明确停止 return # 初始化失败,明确停止
except Exception as init_err: except Exception as init_err:
logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。") logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。")
return # 初始化异常,明确停止 return # 初始化异常,明确停止
# 再次检查 should_continue 标志,确保初始化成功且未被外部停止 # 再次检查 should_continue 标志,确保初始化成功且未被外部停止
if not self.should_continue: if not self.should_continue:
logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。") logger.warning(
f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。"
)
return return
logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...") logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...")
@ -283,7 +295,7 @@ class Conversation:
会停止后台任务解绑观察者等 会停止后台任务解绑观察者等
""" """
logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}") logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}")
self.should_continue = False # 设置标志,让主循环退出 self.should_continue = False # 设置标志,让主循环退出
# 停止空闲对话检测器 # 停止空闲对话检测器
if self.idle_conversation_starter: if self.idle_conversation_starter:
@ -310,11 +322,11 @@ class Conversation:
# 循环前再次确认初始化状态 # 循环前再次确认初始化状态
if not self._initialized: if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环,退出。") logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环,退出。")
return # 明确退出 return # 明确退出
# 主循环,只要 should_continue 为 True 就一直运行 # 主循环,只要 should_continue 为 True 就一直运行
while self.should_continue: while self.should_continue:
loop_iter_start_time = time.time() # 记录本次循环开始时间 loop_iter_start_time = time.time() # 记录本次循环开始时间
logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})") logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})")
# --- 处理忽略状态 --- # --- 处理忽略状态 ---
@ -327,13 +339,13 @@ class Conversation:
# 计算需要睡眠的时间最多30秒或直到忽略结束 # 计算需要睡眠的时间最多30秒或直到忽略结束
sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time) sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time)
await asyncio.sleep(sleep_duration) await asyncio.sleep(sleep_duration)
continue # 跳过本次循环的后续步骤,直接进入下一次迭代检查 continue # 跳过本次循环的后续步骤,直接进入下一次迭代检查
elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp: elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp:
# 如果忽略时间已到 # 如果忽略时间已到
logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。") logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None # 清除忽略时间戳 self.ignore_until_timestamp = None # 清除忽略时间戳
await self.stop() # 调用 stop 方法来结束整个对话实例 await self.stop() # 调用 stop 方法来结束整个对话实例
continue # 跳过本次循环的后续步骤 continue # 跳过本次循环的后续步骤
else: else:
# 如果不在忽略状态,确保空闲检测器在运行 # 如果不在忽略状态,确保空闲检测器在运行
if self.idle_conversation_starter and not self.idle_conversation_starter._running: if self.idle_conversation_starter and not self.idle_conversation_starter._running:
@ -346,7 +358,7 @@ class Conversation:
if not all([self.action_planner, self.observation_info, self.conversation_info]): if not all([self.action_planner, self.observation_info, self.conversation_info]):
logger.error(f"[私聊][{self.private_name}] 核心组件未初始化无法继续规划循环。将等待5秒后重试...") logger.error(f"[私聊][{self.private_name}] 核心组件未初始化无法继续规划循环。将等待5秒后重试...")
await asyncio.sleep(5) await asyncio.sleep(5)
continue # 跳过本次迭代 continue # 跳过本次迭代
# 2. 记录规划开始时间并重置临时状态 # 2. 记录规划开始时间并重置临时状态
planning_start_time = time.time() planning_start_time = time.time()
@ -358,21 +370,21 @@ class Conversation:
logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan...") logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan...")
# 传入当前观察信息、对话信息和上次成功回复的动作类型 # 传入当前观察信息、对话信息和上次成功回复的动作类型
action, reason = await self.action_planner.plan( action, reason = await self.action_planner.plan(
self.observation_info, self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
self.conversation_info,
self.conversation_info.last_successful_reply_action
) )
planning_duration = time.time() - planning_start_time planning_duration = time.time() - planning_start_time
logger.debug(f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}") logger.debug(
f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}"
)
# 4. 检查规划期间是否有新消息到达 # 4. 检查规划期间是否有新消息到达
current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []) current_unprocessed_messages = getattr(self.observation_info, "unprocessed_messages", [])
new_messages_during_planning: List[Dict[str, Any]] = [] new_messages_during_planning: List[Dict[str, Any]] = []
other_new_messages_during_planning: List[Dict[str, Any]] = [] other_new_messages_during_planning: List[Dict[str, Any]] = []
# 遍历当前所有未处理的消息 # 遍历当前所有未处理的消息
for msg in current_unprocessed_messages: for msg in current_unprocessed_messages:
msg_time = msg.get('time') msg_time = msg.get("time")
sender_id = msg.get("user_info", {}).get("user_id") sender_id = msg.get("user_info", {}).get("user_id")
# 检查消息时间是否在本次规划开始之后 # 检查消息时间是否在本次规划开始之后
if msg_time and msg_time >= planning_start_time: if msg_time and msg_time >= planning_start_time:
@ -381,9 +393,11 @@ class Conversation:
if sender_id != self.bot_qq_str: if sender_id != self.bot_qq_str:
other_new_messages_during_planning.append(msg) other_new_messages_during_planning.append(msg)
new_msg_count = len(new_messages_during_planning) # 规划期间所有新消息数 new_msg_count = len(new_messages_during_planning) # 规划期间所有新消息数
other_new_msg_count = len(other_new_messages_during_planning) # 规划期间他人新消息数 other_new_msg_count = len(other_new_messages_during_planning) # 规划期间他人新消息数
logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") logger.debug(
f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}"
)
# 5. 根据动作类型和新消息数量,判断是否需要中断当前规划 # 5. 根据动作类型和新消息数量,判断是否需要中断当前规划
should_interrupt: bool = False should_interrupt: bool = False
@ -410,9 +424,9 @@ class Conversation:
cancel_record = { cancel_record = {
"action": action, "action": action,
"plan_reason": reason, "plan_reason": reason,
"status": "cancelled_due_to_new_messages", # 标记取消原因 "status": "cancelled_due_to_new_messages", # 标记取消原因
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"final_reason": interrupt_reason "final_reason": interrupt_reason,
} }
# 安全地添加到 done_action 列表 # 安全地添加到 done_action 列表
if not hasattr(self.conversation_info, "done_action"): if not hasattr(self.conversation_info, "done_action"):
@ -423,8 +437,8 @@ class Conversation:
self.conversation_info.last_successful_reply_action = None self.conversation_info.last_successful_reply_action = None
# 将状态设置回分析,准备处理新消息并重新规划 # 将状态设置回分析,准备处理新消息并重新规划
self.state = ConversationState.ANALYZING self.state = ConversationState.ANALYZING
await asyncio.sleep(0.1) # 短暂等待避免CPU空转 await asyncio.sleep(0.1) # 短暂等待避免CPU空转
continue # 直接进入下一次循环迭代 continue # 直接进入下一次循环迭代
# 7. 如果未中断,存储规划期间的他人新消息数,并执行动作 # 7. 如果未中断,存储规划期间的他人新消息数,并执行动作
logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'...") logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'...")
@ -448,34 +462,38 @@ class Conversation:
goal_ended = True goal_ended = True
# 检查最后执行的动作是否是结束类型且成功完成 # 检查最后执行的动作是否是结束类型且成功完成
last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} last_action_record = (
self.conversation_info.done_action[-1] if self.conversation_info.done_action else {}
)
action_ended: bool = ( action_ended: bool = (
last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("action") in ["end_conversation", "say_goodbye"]
last_action_record.get("status") == "done" and last_action_record.get("status") == "done"
) )
# 如果满足任一结束条件,则停止循环 # 如果满足任一结束条件,则停止循环
if goal_ended or action_ended: if goal_ended or action_ended:
logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。") logger.info(
await self.stop() # 调用 stop 来停止实例 f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"
continue # 跳过后续,虽然 stop 会设置 should_continue=False )
await self.stop() # 调用 stop 来停止实例
continue # 跳过后续,虽然 stop 会设置 should_continue=False
except asyncio.CancelledError: except asyncio.CancelledError:
# 处理任务被取消的情况 # 处理任务被取消的情况
logger.info(f"[私聊][{self.private_name}] PFC 主循环任务被取消。") logger.info(f"[私聊][{self.private_name}] PFC 主循环任务被取消。")
await self.stop() # 确保资源被清理 await self.stop() # 确保资源被清理
break # 明确退出循环 break # 明确退出循环
except Exception as loop_err: except Exception as loop_err:
# 捕获循环中的其他未预期错误 # 捕获循环中的其他未预期错误
logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}") logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
self.state = ConversationState.ERROR # 设置错误状态 self.state = ConversationState.ERROR # 设置错误状态
# 可以在这里添加更复杂的错误恢复逻辑,或者简单等待后重试 # 可以在这里添加更复杂的错误恢复逻辑,或者简单等待后重试
await asyncio.sleep(5) # 等待一段时间,避免错误状态下快速空转 await asyncio.sleep(5) # 等待一段时间,避免错误状态下快速空转
# --- 控制循环频率 --- # --- 控制循环频率 ---
loop_duration = time.time() - loop_iter_start_time # 计算本次循环耗时 loop_duration = time.time() - loop_iter_start_time # 计算本次循环耗时
min_loop_interval = 0.1 # 设置最小循环间隔防止CPU占用过高 min_loop_interval = 0.1 # 设置最小循环间隔防止CPU占用过高
logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。") logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。")
if loop_duration < min_loop_interval: if loop_duration < min_loop_interval:
# 如果循环太快,则睡眠一段时间 # 如果循环太快,则睡眠一段时间
@ -484,15 +502,16 @@ class Conversation:
# 循环结束后的日志 # 循环结束后的日志
logger.info(f"[私聊][{self.private_name}] PFC 循环已退出 for stream_id: {self.stream_id}") logger.info(f"[私聊][{self.private_name}] PFC 循环已退出 for stream_id: {self.stream_id}")
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]: def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]:
"""将从数据库或其他来源获取的消息字典转换为内部使用的 Message 对象""" """将从数据库或其他来源获取的消息字典转换为内部使用的 Message 对象"""
try: try:
# 优先使用实例自身的 chat_stream如果不存在则尝试从管理器获取 # 优先使用实例自身的 chat_stream如果不存在则尝试从管理器获取
chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id) chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id)
if not chat_stream_to_use: if not chat_stream_to_use:
logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。") logger.error(
return None # 无法确定聊天流,返回 None f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"
)
return None # 无法确定聊天流,返回 None
# 解析用户信息字典 # 解析用户信息字典
user_info_dict = msg_dict.get("user_info", {}) user_info_dict = msg_dict.get("user_info", {})
@ -503,35 +522,34 @@ class Conversation:
user_info = UserInfo.from_dict(user_info_dict) user_info = UserInfo.from_dict(user_info_dict)
except Exception as e: except Exception as e:
# 解析失败记录警告 # 解析失败记录警告
logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") logger.warning(
f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}"
)
if not user_info: if not user_info:
# 如果没有有效的 UserInfo记录警告并返回 None # 如果没有有效的 UserInfo记录警告并返回 None
logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}") logger.warning(
f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}"
)
return None return None
# 创建并返回 Message 对象 # 创建并返回 Message 对象
return Message( return Message(
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 如果没有ID生成一个临时的 message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 如果没有ID生成一个临时的
chat_stream=chat_stream_to_use, chat_stream=chat_stream_to_use,
time=msg_dict.get("time", time.time()), # 如果没有时间戳,使用当前时间 time=msg_dict.get("time", time.time()), # 如果没有时间戳,使用当前时间
user_info=user_info, # 使用解析出的 UserInfo 对象 user_info=user_info, # 使用解析出的 UserInfo 对象
processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本 processed_plain_text=msg_dict.get("processed_plain_text", ""), # 获取处理后的纯文本
detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本 detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 获取详细纯文本
# 根据 Message 类的定义,可能还需要其他字段 # 根据 Message 类的定义,可能还需要其他字段
) )
except Exception as e: except Exception as e:
# 捕获转换过程中的任何异常 # 捕获转换过程中的任何异常
logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}") logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
return None # 转换失败返回 None return None # 转换失败返回 None
async def _handle_action( async def _handle_action(
self, self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo
action: str,
reason: str,
observation_info: ObservationInfo,
conversation_info: ConversationInfo
): ):
""" """
处理由 ActionPlanner 规划出的具体行动 处理由 ActionPlanner 规划出的具体行动
@ -544,15 +562,15 @@ class Conversation:
return return
logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}")
action_start_time = time.time() # 记录动作开始时间,用于计算耗时 action_start_time = time.time() # 记录动作开始时间,用于计算耗时
# --- 准备动作历史记录 --- # --- 准备动作历史记录 ---
current_action_record = { current_action_record = {
"action": action, "action": action,
"plan_reason": reason, # 记录规划时的原因 "plan_reason": reason, # 记录规划时的原因
"status": "start", # 初始状态为“开始” "status": "start", # 初始状态为“开始”
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间 "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 记录开始时间
"final_reason": None, # 最终结果的原因,将在 finally 中设置 "final_reason": None, # 最终结果的原因,将在 finally 中设置
} }
# 安全地添加到历史记录列表 # 安全地添加到历史记录列表
if not hasattr(conversation_info, "done_action"): if not hasattr(conversation_info, "done_action"):
@ -562,10 +580,10 @@ class Conversation:
action_index = len(conversation_info.done_action) - 1 action_index = len(conversation_info.done_action) - 1
# --- 初始化动作执行状态变量 --- # --- 初始化动作执行状态变量 ---
action_successful: bool = False # 标记动作是否成功执行 action_successful: bool = False # 标记动作是否成功执行
final_status: str = "recall" # 动作最终状态,默认为 recall (表示未成功或需重试) final_status: str = "recall" # 动作最终状态,默认为 recall (表示未成功或需重试)
final_reason: str = "动作未成功执行" # 动作最终原因 final_reason: str = "动作未成功执行" # 动作最终原因
need_replan_from_checker: bool = False # 标记是否由 ReplyChecker 要求重新规划 need_replan_from_checker: bool = False # 标记是否由 ReplyChecker 要求重新规划
try: try:
# --- 根据不同的 action 类型执行相应的逻辑 --- # --- 根据不同的 action 类型执行相应的逻辑 ---
@ -573,44 +591,44 @@ class Conversation:
# 1. 处理需要生成并可能发送消息的动作 # 1. 处理需要生成并可能发送消息的动作
if action in ["direct_reply", "send_new_message"]: if action in ["direct_reply", "send_new_message"]:
# --- a. 生成回复 --- # --- a. 生成回复 ---
self.state = ConversationState.GENERATING # 更新对话状态 self.state = ConversationState.GENERATING # 更新对话状态
if not self.reply_generator: if not self.reply_generator:
# 检查依赖组件是否存在 # 检查依赖组件是否存在
raise RuntimeError("ReplyGenerator 未初始化") raise RuntimeError("ReplyGenerator 未初始化")
# 调用 ReplyGenerator 生成回复内容 # 调用 ReplyGenerator 生成回复内容
generated_content = await self.reply_generator.generate( generated_content = await self.reply_generator.generate(
observation_info, observation_info, conversation_info, action_type=action
conversation_info,
action_type=action
) )
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") # 日志中截断长内容 logger.info(
f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'"
) # 日志中截断长内容
# 检查生成内容是否有效 # 检查生成内容是否有效
if not generated_content or generated_content.startswith("抱歉"): if not generated_content or generated_content.startswith("抱歉"):
# 如果生成失败或返回错误提示 # 如果生成失败或返回错误提示
logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,标记失败。") logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,标记失败。")
final_reason = "生成内容无效" final_reason = "生成内容无效"
final_status = "recall" # 标记为 recall final_status = "recall" # 标记为 recall
# 重置追问状态,因为本次回复失败 # 重置追问状态,因为本次回复失败
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
else: else:
# --- b. 检查回复 (如果生成成功) --- # --- b. 检查回复 (如果生成成功) ---
self.state = ConversationState.CHECKING # 更新状态为检查中 self.state = ConversationState.CHECKING # 更新状态为检查中
if not self.reply_checker: if not self.reply_checker:
raise RuntimeError("ReplyChecker 未初始化") raise RuntimeError("ReplyChecker 未初始化")
# 准备检查所需的上下文信息 # 准备检查所需的上下文信息
current_goal_str: str = "" # 当前对话目标字符串 current_goal_str: str = "" # 当前对话目标字符串
if conversation_info.goal_list: if conversation_info.goal_list:
# 通常检查最新的目标 # 通常检查最新的目标
goal_item = conversation_info.goal_list[-1] goal_item = conversation_info.goal_list[-1]
if isinstance(goal_item, dict): if isinstance(goal_item, dict):
current_goal_str = goal_item.get('goal', '') current_goal_str = goal_item.get("goal", "")
elif isinstance(goal_item, str): elif isinstance(goal_item, str):
current_goal_str = goal_item current_goal_str = goal_item
# 获取用于检查的聊天记录 (列表和字符串形式) # 获取用于检查的聊天记录 (列表和字符串形式)
chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, 'chat_history', []) chat_history_for_check: List[Dict[str, Any]] = getattr(observation_info, "chat_history", [])
chat_history_text_for_check: str = getattr(observation_info, 'chat_history_str', '') chat_history_text_for_check: str = getattr(observation_info, "chat_history_str", "")
# 当前重试次数 (如果未来加入重试逻辑,这里需要传递实际次数) # 当前重试次数 (如果未来加入重试逻辑,这里需要传递实际次数)
retry_count: int = 0 retry_count: int = 0
@ -619,11 +637,13 @@ class Conversation:
is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check( is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check(
reply=generated_content, reply=generated_content,
goal=current_goal_str, goal=current_goal_str,
chat_history=chat_history_for_check, # 传递列表形式的历史记录 chat_history=chat_history_for_check, # 传递列表形式的历史记录
chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录 chat_history_text=chat_history_text_for_check, # 传递文本形式的历史记录
retry_count=retry_count retry_count=retry_count,
)
logger.info(
f"[私聊][{self.private_name}] ReplyChecker 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}"
) )
logger.info(f"[私聊][{self.private_name}] ReplyChecker 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}")
# --- c. 处理检查结果 --- # --- c. 处理检查结果 ---
if not is_suitable or need_replan_from_checker: if not is_suitable or need_replan_from_checker:
@ -633,7 +653,7 @@ class Conversation:
conversation_info.last_rejected_reply_content = generated_content conversation_info.last_rejected_reply_content = generated_content
# 设置最终状态和原因 # 设置最终状态和原因
final_reason = f"回复检查不通过: {check_reason}" final_reason = f"回复检查不通过: {check_reason}"
final_status = "recall" # 标记为 recall final_status = "recall" # 标记为 recall
# 重置追问状态 # 重置追问状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因回复检查失败而被拒绝。") logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因回复检查失败而被拒绝。")
@ -645,57 +665,72 @@ class Conversation:
conversation_info.last_rejected_reply_content = None conversation_info.last_rejected_reply_content = None
# --- d. 发送回复 --- # --- d. 发送回复 ---
self.generated_reply = generated_content # 存储待发送内容 self.generated_reply = generated_content # 存储待发送内容
timestamp_before_sending = time.time() # 记录发送前时间戳 timestamp_before_sending = time.time() # 记录发送前时间戳
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}") logger.debug(
self.state = ConversationState.SENDING # 更新状态为发送中 f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}"
)
self.state = ConversationState.SENDING # 更新状态为发送中
# 调用内部发送方法 # 调用内部发送方法
send_success = await self._send_reply() send_success = await self._send_reply()
send_end_time = time.time() # 记录发送结束时间 send_end_time = time.time() # 记录发送结束时间
if send_success: if send_success:
# 如果发送成功 # 如果发送成功
action_successful = True # 标记动作成功 action_successful = True # 标记动作成功
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.")
# 更新空闲计时器 # 更新空闲计时器
if self.idle_conversation_starter: if self.idle_conversation_starter:
await self.idle_conversation_starter.update_last_message_time(send_end_time) await self.idle_conversation_starter.update_last_message_time(send_end_time)
# --- e. 清理已处理消息 --- # --- e. 清理已处理消息 ---
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", [])
message_ids_to_clear: Set[str] = set() message_ids_to_clear: Set[str] = set()
# 遍历所有未处理消息 # 遍历所有未处理消息
for msg in current_unprocessed_messages: for msg in current_unprocessed_messages:
msg_time = msg.get('time') msg_time = msg.get("time")
msg_id = msg.get('message_id') msg_id = msg.get("message_id")
sender_id = msg.get("user_info", {}).get("user_id") sender_id = msg.get("user_info", {}).get("user_id")
# 规则:只清理【发送前】收到的、【来自他人】的消息 # 规则:只清理【发送前】收到的、【来自他人】的消息
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: if (
msg_id
and msg_time
and sender_id != self.bot_qq_str
and msg_time < timestamp_before_sending
):
message_ids_to_clear.add(msg_id) message_ids_to_clear.add(msg_id)
# 如果有需要清理的消息,调用清理方法 # 如果有需要清理的消息,调用清理方法
if message_ids_to_clear: if message_ids_to_clear:
logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}") logger.debug(
f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"
)
await observation_info.clear_processed_messages(message_ids_to_clear) await observation_info.clear_processed_messages(message_ids_to_clear)
else: else:
logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。")
# --- f. 决定下一轮规划类型 --- # --- f. 决定下一轮规划类型 ---
# 从 conversation_info 获取【规划期间】收到的【他人】新消息数量 # 从 conversation_info 获取【规划期间】收到的【他人】新消息数量
other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) other_new_msg_count_during_planning = getattr(
conversation_info, "other_new_messages_during_planning_count", 0
)
# 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复 # 规则:如果规划期间收到他人新消息 (0 < count <= 2),则下一轮强制初始回复
if other_new_msg_count_during_planning > 0: if other_new_msg_count_during_planning > 0:
logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") logger.info(
conversation_info.last_successful_reply_action = None # 强制初始回复 f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。"
)
conversation_info.last_successful_reply_action = None # 强制初始回复
else: else:
# 规则:如果规划期间【没有】收到他人新消息,则允许追问 # 规则:如果规划期间【没有】收到他人新消息,则允许追问
logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。") logger.info(
conversation_info.last_successful_reply_action = action # 允许追问 f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。"
)
conversation_info.last_successful_reply_action = action # 允许追问
else: else:
# 如果发送失败 # 如果发送失败
logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。") logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。")
final_status = "recall" # 发送失败,标记为 recall final_status = "recall" # 发送失败,标记为 recall
final_reason = "发送回复时失败" final_reason = "发送回复时失败"
# 重置追问状态 # 重置追问状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
@ -707,9 +742,7 @@ class Conversation:
raise RuntimeError("ReplyGenerator 未初始化") raise RuntimeError("ReplyGenerator 未初始化")
# 生成告别语 # 生成告别语
generated_content = await self.reply_generator.generate( generated_content = await self.reply_generator.generate(
observation_info, observation_info, conversation_info, action_type=action
conversation_info,
action_type=action
) )
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'")
@ -725,25 +758,32 @@ class Conversation:
# 发送告别语 # 发送告别语
self.generated_reply = generated_content self.generated_reply = generated_content
timestamp_before_sending = time.time() timestamp_before_sending = time.time()
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}") logger.debug(
f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}"
)
self.state = ConversationState.SENDING self.state = ConversationState.SENDING
send_success = await self._send_reply() send_success = await self._send_reply()
send_end_time = time.time() send_end_time = time.time()
if send_success: if send_success:
action_successful = True # 标记成功 action_successful = True # 标记成功
logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。") logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。")
# 更新空闲计时器 # 更新空闲计时器
if self.idle_conversation_starter: if self.idle_conversation_starter:
await self.idle_conversation_starter.update_last_message_time(send_end_time) await self.idle_conversation_starter.update_last_message_time(send_end_time)
# 清理发送前的消息 (虽然通常是最后一条,但保持逻辑一致) # 清理发送前的消息 (虽然通常是最后一条,但保持逻辑一致)
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", [])
message_ids_to_clear: Set[str] = set() message_ids_to_clear: Set[str] = set()
for msg in current_unprocessed_messages: for msg in current_unprocessed_messages:
msg_time = msg.get('time') msg_time = msg.get("time")
msg_id = msg.get('message_id') msg_id = msg.get("message_id")
sender_id = msg.get("user_info", {}).get("user_id") sender_id = msg.get("user_info", {}).get("user_id")
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: if (
msg_id
and msg_time
and sender_id != self.bot_qq_str
and msg_time < timestamp_before_sending
):
message_ids_to_clear.add(msg_id) message_ids_to_clear.add(msg_id)
if message_ids_to_clear: if message_ids_to_clear:
await observation_info.clear_processed_messages(message_ids_to_clear) await observation_info.clear_processed_messages(message_ids_to_clear)
@ -764,7 +804,7 @@ class Conversation:
raise RuntimeError("GoalAnalyzer 未初始化") raise RuntimeError("GoalAnalyzer 未初始化")
# 调用 GoalAnalyzer 分析并更新目标 # 调用 GoalAnalyzer 分析并更新目标
await self.goal_analyzer.analyze_goal(conversation_info, observation_info) await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
action_successful = True # 标记成功 action_successful = True # 标记成功
# 4. 处理倾听动作 # 4. 处理倾听动作
elif action == "listening": elif action == "listening":
@ -774,22 +814,24 @@ class Conversation:
logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...") logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...")
# 调用 Waiter 的倾听等待方法,内部会处理超时 # 调用 Waiter 的倾听等待方法,内部会处理超时
await self.waiter.wait_listening(conversation_info) await self.waiter.wait_listening(conversation_info)
action_successful = True # listening 动作本身执行即视为成功,后续由新消息或超时驱动 action_successful = True # listening 动作本身执行即视为成功,后续由新消息或超时驱动
# 5. 处理结束对话动作 # 5. 处理结束对话动作
elif action == "end_conversation": elif action == "end_conversation":
logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...") logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...")
action_successful = True # 标记成功 action_successful = True # 标记成功
self.should_continue = False # 设置标志以退出循环 self.should_continue = False # 设置标志以退出循环
# 6. 处理屏蔽忽略动作 # 6. 处理屏蔽忽略动作
elif action == "block_and_ignore": elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...") logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...")
ignore_duration_seconds = 10 * 60 # 忽略 10 分钟,可配置 ignore_duration_seconds = 10 * 60 # 忽略 10 分钟,可配置
self.ignore_until_timestamp = time.time() + ignore_duration_seconds self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}") logger.info(
self.state = ConversationState.IGNORED # 设置忽略状态 f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
action_successful = True # 标记成功 )
self.state = ConversationState.IGNORED # 设置忽略状态
action_successful = True # 标记成功
# 7. 处理等待动作 # 7. 处理等待动作
elif action == "wait": elif action == "wait":
@ -799,14 +841,14 @@ class Conversation:
logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...") logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...")
# 调用 Waiter 的常规等待方法,内部处理超时 # 调用 Waiter 的常规等待方法,内部处理超时
timeout_occurred = await self.waiter.wait(self.conversation_info) timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # wait 动作本身执行即视为成功 action_successful = True # wait 动作本身执行即视为成功
# wait 动作完成后不需要清理消息,等待新消息或超时触发重新规划 # wait 动作完成后不需要清理消息,等待新消息或超时触发重新规划
logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。") logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。")
# 8. 处理未知的动作类型 # 8. 处理未知的动作类型
else: else:
logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}") logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}")
final_status = "recall" # 未知动作标记为 recall final_status = "recall" # 未知动作标记为 recall
final_reason = f"未知的动作类型: {action}" final_reason = f"未知的动作类型: {action}"
# --- 重置非回复动作的追问状态 --- # --- 重置非回复动作的追问状态 ---
@ -824,14 +866,14 @@ class Conversation:
final_reason = "动作处理被取消" final_reason = "动作处理被取消"
# 取消时也重置追问状态 # 取消时也重置追问状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
raise # 重新抛出 CancelledError让上层知道任务被取消 raise # 重新抛出 CancelledError让上层知道任务被取消
except Exception as handle_err: except Exception as handle_err:
# 捕获处理动作过程中的其他所有异常 # 捕获处理动作过程中的其他所有异常
logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}") logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
final_status = "error" # 标记为错误状态 final_status = "error" # 标记为错误状态
final_reason = f"处理动作时出错: {handle_err}" final_reason = f"处理动作时出错: {handle_err}"
self.state = ConversationState.ERROR # 设置对话状态为错误 self.state = ConversationState.ERROR # 设置对话状态为错误
# 出错时重置追问状态 # 出错时重置追问状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
@ -850,7 +892,11 @@ class Conversation:
if action == "wait": if action == "wait":
# 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list # 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list
# 这里简化处理,直接使用通用成功原因 # 这里简化处理,直接使用通用成功原因
timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False timeout_occurred = (
any("分钟," in g.get("goal", "") for g in conversation_info.goal_list if isinstance(g, dict))
if conversation_info.goal_list
else False
)
final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)")
elif action == "listening": elif action == "listening":
final_reason = "进入倾听状态" final_reason = "进入倾听状态"
@ -868,25 +914,26 @@ class Conversation:
if not final_reason or final_reason == "动作未成功执行": if not final_reason or final_reason == "动作未成功执行":
# 排除已经被 checker 设置的原因 # 排除已经被 checker 设置的原因
if not conversation_info.last_reply_rejection_reason: if not conversation_info.last_reply_rejection_reason:
final_reason = "动作执行失败或被取消" # 提供一个更通用的失败原因 final_reason = "动作执行失败或被取消" # 提供一个更通用的失败原因
# 更新历史记录字典 # 更新历史记录字典
if conversation_info.done_action and action_index < len(conversation_info.done_action): if conversation_info.done_action and action_index < len(conversation_info.done_action):
# 使用 update 方法更新字典,更安全 # 使用 update 方法更新字典,更安全
conversation_info.done_action[action_index].update( conversation_info.done_action[action_index].update(
{ {
"status": final_status, # 最终状态 "status": final_status, # 最终状态
"time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间 "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间
"final_reason": final_reason, # 最终原因 "final_reason": final_reason, # 最终原因
"duration_ms": int((time.time() - action_start_time) * 1000) # 记录耗时(毫秒) "duration_ms": int((time.time() - action_start_time) * 1000), # 记录耗时(毫秒)
} }
) )
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") logger.debug(
f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}"
)
else: else:
# 如果索引无效或列表为空,记录错误 # 如果索引无效或列表为空,记录错误
logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。") logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。")
async def _send_reply(self) -> bool: async def _send_reply(self) -> bool:
"""发送 `self.generated_reply` 中的内容到聊天流""" """发送 `self.generated_reply` 中的内容到聊天流"""
# 检查是否有内容可发送 # 检查是否有内容可发送
@ -907,17 +954,17 @@ class Conversation:
await self.direct_sender.send_message( await self.direct_sender.send_message(
chat_stream=self.chat_stream, chat_stream=self.chat_stream,
content=reply_content, content=reply_content,
reply_to_message=None # 私聊通常不需要引用回复 reply_to_message=None, # 私聊通常不需要引用回复
) )
# 发送成功后,将状态设置回分析,准备下一轮规划 # 发送成功后,将状态设置回分析,准备下一轮规划
self.state = ConversationState.ANALYZING self.state = ConversationState.ANALYZING
return True # 返回成功 return True # 返回成功
except Exception as e: except Exception as e:
# 捕获发送过程中的异常 # 捕获发送过程中的异常
logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
self.state = ConversationState.ERROR # 发送失败标记错误状态 self.state = ConversationState.ERROR # 发送失败标记错误状态
return False # 返回失败 return False # 返回失败
async def _send_timeout_message(self): async def _send_timeout_message(self):
"""在等待超时后发送一条结束消息""" """在等待超时后发送一条结束消息"""
@ -930,13 +977,11 @@ class Conversation:
timeout_content = "我们好像很久没说话了,先这样吧~" timeout_content = "我们好像很久没说话了,先这样吧~"
# 发送超时消息 # 发送超时消息
await self.direct_sender.send_message( await self.direct_sender.send_message(
chat_stream=self.chat_stream, chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None
content=timeout_content,
reply_to_message=None
) )
logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。") logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。")
# 发送超时消息后,通常意味着对话结束,调用 stop # 发送超时消息后,通常意味着对话结束,调用 stop
await self.stop() await self.stop()
except Exception as e: except Exception as e:
# 捕获发送超时消息的异常 # 捕获发送超时消息的异常
logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}")

View File

@ -99,7 +99,8 @@ class IdleConversationStarter:
# 重新随机化下一次触发的时间阈值 # 重新随机化下一次触发的时间阈值
self.actual_idle_threshold = random.randint(global_config.min_idle_time, global_config.max_idle_time) self.actual_idle_threshold = random.randint(global_config.min_idle_time, global_config.max_idle_time)
logger.debug( logger.debug(
f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}") f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}"
)
def reload_config(self) -> None: def reload_config(self) -> None:
"""重新加载配置 """重新加载配置
@ -108,13 +109,15 @@ class IdleConversationStarter:
""" """
try: try:
logger.debug( logger.debug(
f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={global_config.enable_idle_conversation}, 检查间隔={global_config.idle_check_interval}秒, 最短间隔={global_config.min_idle_time}秒, 最长间隔={global_config.max_idle_time}") f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={global_config.enable_idle_conversation}, 检查间隔={global_config.idle_check_interval}秒, 最短间隔={global_config.min_idle_time}秒, 最长间隔={global_config.max_idle_time}"
)
# 重新计算实际阈值 # 重新计算实际阈值
async def update_threshold(): async def update_threshold():
async with self._lock: async with self._lock:
self.actual_idle_threshold = random.randint(global_config.min_idle_time, self.actual_idle_threshold = random.randint(
global_config.max_idle_time) global_config.min_idle_time, global_config.max_idle_time
)
logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}") logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}")
# 创建一个任务来异步更新阈值 # 创建一个任务来异步更新阈值
@ -202,7 +205,7 @@ class IdleConversationStarter:
try: try:
content, _ = await asyncio.wait_for( content, _ = await asyncio.wait_for(
self.llm.generate_response_async(prompt), self.llm.generate_response_async(prompt),
timeout=30 # 30秒超时 timeout=30, # 30秒超时
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时") logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时")
@ -213,7 +216,7 @@ class IdleConversationStarter:
# 清理结果 # 清理结果
content = content.strip() content = content.strip()
content = content.strip('"\'') content = content.strip("\"'")
if not content: if not content:
logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空") logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空")
@ -254,17 +257,13 @@ class IdleConversationStarter:
# 发送消息 # 发送消息
try: try:
await self.message_sender.send_message( await self.message_sender.send_message(chat_stream=chat_stream, content=content, reply_to_message=None)
chat_stream=chat_stream,
content=content,
reply_to_message=None
)
# 更新空闲会话启动器的最后消息时间 # 更新空闲会话启动器的最后消息时间
await self.update_last_message_time() await self.update_last_message_time()
# 如果新对话实例有一个聊天观察者,请触发更新 # 如果新对话实例有一个聊天观察者,请触发更新
if new_conversation and hasattr(new_conversation, 'chat_observer'): if new_conversation and hasattr(new_conversation, "chat_observer"):
logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新") logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新")
try: try:
new_conversation.chat_observer.trigger_update() new_conversation.chat_observer.trigger_update()
@ -279,7 +278,7 @@ class IdleConversationStarter:
# 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃 # 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃
logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}") logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}")
async def _get_chat_stream(self, conversation: Optional['Conversation'] = None) -> Optional[ChatStream]: async def _get_chat_stream(self, conversation: Optional["Conversation"] = None) -> Optional[ChatStream]:
"""获取可用的聊天流 """获取可用的聊天流
尝试多种方式获取聊天流 尝试多种方式获取聊天流
@ -296,7 +295,7 @@ class IdleConversationStarter:
chat_stream = None chat_stream = None
# 1. 尝试从对话实例获取 # 1. 尝试从对话实例获取
if conversation and hasattr(conversation, 'should_continue'): if conversation and hasattr(conversation, "should_continue"):
# 等待一小段时间,确保初始化完成 # 等待一小段时间,确保初始化完成
retry_count = 0 retry_count = 0
max_retries = 10 max_retries = 10
@ -309,12 +308,13 @@ class IdleConversationStarter:
logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流") logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流")
# 尝试使用对话实例的聊天流 # 尝试使用对话实例的聊天流
if hasattr(conversation, 'chat_stream') and conversation.chat_stream: if hasattr(conversation, "chat_stream") and conversation.chat_stream:
logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流") logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流")
return conversation.chat_stream return conversation.chat_stream
# 2. 尝试从聊天管理器获取 # 2. 尝试从聊天管理器获取
from src.plugins.chat.chat_stream import chat_manager from src.plugins.chat.chat_stream import chat_manager
try: try:
logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流") logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流")
chat_stream = chat_manager.get_stream(self.stream_id) chat_stream = chat_manager.get_stream(self.stream_id)
@ -327,13 +327,9 @@ class IdleConversationStarter:
try: try:
logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流") logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流")
# 创建用户信息对象 # 创建用户信息对象
user_info = UserInfo( user_info = UserInfo(user_id=global_config.BOT_QQ, user_nickname=global_config.BOT_NICKNAME, platform="qq")
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform="qq"
)
# 创建聊天流 # 创建聊天流
return ChatStream(self.stream_id, "qq", user_info) return ChatStream(self.stream_id, "qq", user_info)
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}")
return None return None

View File

@ -25,15 +25,15 @@ class ObservationInfoHandler(NotificationHandler):
"""处理来自 ChatObserver 的通知""" """处理来自 ChatObserver 的通知"""
notification_type = notification.type notification_type = notification.type
data = notification.data data = notification.data
timestamp = notification.timestamp # 获取通知时间戳 timestamp = notification.timestamp # 获取通知时间戳
try: try:
if notification_type == NotificationType.NEW_MESSAGE: if notification_type == NotificationType.NEW_MESSAGE:
# 处理新消息通知 # 处理新消息通知
message_dict = data # data 本身就是消息字典 message_dict = data # data 本身就是消息字典
if not isinstance(message_dict, dict): if not isinstance(message_dict, dict):
logger.warning(f"[私聊][{self.private_name}] 收到的 NEW_MESSAGE 数据不是字典: {data}") logger.warning(f"[私聊][{self.private_name}] 收到的 NEW_MESSAGE 数据不是字典: {data}")
return return
# 解析 UserInfo # 解析 UserInfo
user_info_dict = message_dict.get("user_info") user_info_dict = message_dict.get("user_info")
@ -42,9 +42,13 @@ class ObservationInfoHandler(NotificationHandler):
try: try:
user_info = UserInfo.from_dict(user_info_dict) user_info = UserInfo.from_dict(user_info_dict)
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") logger.error(
f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}"
)
elif user_info_dict is not None: elif user_info_dict is not None:
logger.warning(f"[私聊][{self.private_name}] 收到的 user_info 不是预期的字典类型: {type(user_info_dict)}") logger.warning(
f"[私聊][{self.private_name}] 收到的 user_info 不是预期的字典类型: {type(user_info_dict)}"
)
# 更新 ObservationInfo # 更新 ObservationInfo
await self.observation_info.update_from_message(message_dict, user_info) await self.observation_info.update_from_message(message_dict, user_info)
@ -52,7 +56,7 @@ class ObservationInfoHandler(NotificationHandler):
elif notification_type == NotificationType.COLD_CHAT: elif notification_type == NotificationType.COLD_CHAT:
# 处理冷场通知 # 处理冷场通知
is_cold = data.get("is_cold", False) is_cold = data.get("is_cold", False)
await self.observation_info.update_cold_chat_status(is_cold, timestamp) # 使用通知时间戳 await self.observation_info.update_cold_chat_status(is_cold, timestamp) # 使用通知时间戳
elif notification_type == NotificationType.MESSAGE_DELETED: elif notification_type == NotificationType.MESSAGE_DELETED:
# 处理消息删除通知 # 处理消息删除通知
@ -71,12 +75,12 @@ class ObservationInfoHandler(NotificationHandler):
elif notification_type == NotificationType.BOT_SPEAKING: elif notification_type == NotificationType.BOT_SPEAKING:
# 机器人开始说话 (例如,如果需要显示"正在输入...") # 机器人开始说话 (例如,如果需要显示"正在输入...")
# self.observation_info.is_typing = True # self.observation_info.is_typing = True
pass # 暂时不处理 pass # 暂时不处理
elif notification_type == NotificationType.USER_SPEAKING: elif notification_type == NotificationType.USER_SPEAKING:
# 用户开始说话 # 用户开始说话
# self.observation_info.is_typing = True # self.observation_info.is_typing = True
pass # 暂时不处理 pass # 暂时不处理
elif notification_type == NotificationType.USER_JOINED: elif notification_type == NotificationType.USER_JOINED:
user_id = data.get("user_id") user_id = data.get("user_id")
@ -108,32 +112,32 @@ class ObservationInfo:
self.private_name: str = private_name self.private_name: str = private_name
# 聊天记录相关 # 聊天记录相关
self.chat_history: List[Dict[str, Any]] = [] # 存储已处理的消息历史 self.chat_history: List[Dict[str, Any]] = [] # 存储已处理的消息历史
self.chat_history_str: str = "还没有聊天记录。" # 用于生成 Prompt 的历史记录字符串 self.chat_history_str: str = "还没有聊天记录。" # 用于生成 Prompt 的历史记录字符串
self.chat_history_count: int = 0 self.chat_history_count: int = 0
# 未处理消息相关 (核心修改点) # 未处理消息相关 (核心修改点)
self.unprocessed_messages: List[Dict[str, Any]] = [] # 存储尚未被机器人回复的消息 self.unprocessed_messages: List[Dict[str, Any]] = [] # 存储尚未被机器人回复的消息
self.new_messages_count: int = 0 # unprocessed_messages 的数量 self.new_messages_count: int = 0 # unprocessed_messages 的数量
# 状态信息 # 状态信息
self.active_users: Set[str] = set() # 当前活跃用户 (私聊场景可能只有对方) self.active_users: Set[str] = set() # 当前活跃用户 (私聊场景可能只有对方)
self.last_bot_speak_time: Optional[float] = None self.last_bot_speak_time: Optional[float] = None
self.last_user_speak_time: Optional[float] = None # 指对方用户的发言时间 self.last_user_speak_time: Optional[float] = None # 指对方用户的发言时间
self.last_message_time: Optional[float] = None # 指所有消息(包括自己)的最新时间 self.last_message_time: Optional[float] = None # 指所有消息(包括自己)的最新时间
self.last_message_id: Optional[str] = None self.last_message_id: Optional[str] = None
self.last_message_content: str = "" self.last_message_content: str = ""
self.last_message_sender: Optional[str] = None # user_id of the last message sender self.last_message_sender: Optional[str] = None # user_id of the last message sender
self.bot_id: Optional[str] = None # 机器人自己的 ID self.bot_id: Optional[str] = None # 机器人自己的 ID
# 冷场状态 # 冷场状态
self.cold_chat_start_time: Optional[float] = None self.cold_chat_start_time: Optional[float] = None
self.cold_chat_duration: float = 0.0 self.cold_chat_duration: float = 0.0
self.is_cold_chat: bool = False # 当前是否处于冷场状态 self.is_cold_chat: bool = False # 当前是否处于冷场状态
# 其他状态 # 其他状态
self.is_typing: bool = False # 是否正在输入 (未来可能用到) self.is_typing: bool = False # 是否正在输入 (未来可能用到)
self.changed: bool = False # 状态是否有变化 (用于优化) self.changed: bool = False # 状态是否有变化 (用于优化)
# 关联对象 # 关联对象
self.chat_observer: Optional[ChatObserver] = None self.chat_observer: Optional[ChatObserver] = None
@ -142,14 +146,14 @@ class ObservationInfo:
# 初始化 bot_id # 初始化 bot_id
try: try:
from ...config.config import global_config from ...config.config import global_config
self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None self.bot_id = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
if not self.bot_id: if not self.bot_id:
logger.error(f"[私聊][{self.private_name}] 未能从配置中获取 BOT_QQ ID") logger.error(f"[私聊][{self.private_name}] 未能从配置中获取 BOT_QQ ID")
except ImportError: except ImportError:
logger.error(f"[私聊][{self.private_name}] 无法导入 global_config 获取 BOT_QQ ID") logger.error(f"[私聊][{self.private_name}] 无法导入 global_config 获取 BOT_QQ ID")
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}] 获取 BOT_QQ ID 时出错: {e}") logger.error(f"[私聊][{self.private_name}] 获取 BOT_QQ ID 时出错: {e}")
def bind_to_chat_observer(self, chat_observer: ChatObserver): def bind_to_chat_observer(self, chat_observer: ChatObserver):
"""绑定到指定的 ChatObserver 并注册通知处理器""" """绑定到指定的 ChatObserver 并注册通知处理器"""
@ -157,8 +161,8 @@ class ObservationInfo:
logger.warning(f"[私聊][{self.private_name}] 尝试重复绑定 ChatObserver") logger.warning(f"[私聊][{self.private_name}] 尝试重复绑定 ChatObserver")
return return
if not self.handler: if not self.handler:
logger.error(f"[私聊][{self.private_name}] ObservationInfoHandler 未初始化,无法绑定!") logger.error(f"[私聊][{self.private_name}] ObservationInfoHandler 未初始化,无法绑定!")
return return
self.chat_observer = chat_observer self.chat_observer = chat_observer
try: try:
@ -175,12 +179,11 @@ class ObservationInfo:
logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功绑定到 ChatObserver") logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功绑定到 ChatObserver")
except AttributeError: except AttributeError:
logger.error(f"[私聊][{self.private_name}] 绑定的 ChatObserver 对象缺少 notification_manager 属性!") logger.error(f"[私聊][{self.private_name}] 绑定的 ChatObserver 对象缺少 notification_manager 属性!")
self.chat_observer = None # 绑定失败 self.chat_observer = None # 绑定失败
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}] 绑定到 ChatObserver 时出错: {e}") logger.error(f"[私聊][{self.private_name}] 绑定到 ChatObserver 时出错: {e}")
self.chat_observer = None # 绑定失败 self.chat_observer = None # 绑定失败
def unbind_from_chat_observer(self): def unbind_from_chat_observer(self):
"""解除与 ChatObserver 的绑定""" """解除与 ChatObserver 的绑定"""
@ -189,18 +192,19 @@ class ObservationInfo:
notification_manager = self.chat_observer.notification_manager notification_manager = self.chat_observer.notification_manager
notification_manager.unregister_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler) notification_manager.unregister_handler("observation_info", NotificationType.NEW_MESSAGE, self.handler)
notification_manager.unregister_handler("observation_info", NotificationType.COLD_CHAT, self.handler) notification_manager.unregister_handler("observation_info", NotificationType.COLD_CHAT, self.handler)
notification_manager.unregister_handler("observation_info", NotificationType.MESSAGE_DELETED, self.handler) notification_manager.unregister_handler(
"observation_info", NotificationType.MESSAGE_DELETED, self.handler
)
# ... 注销其他已注册的类型 ... # ... 注销其他已注册的类型 ...
logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功从 ChatObserver 解绑") logger.info(f"[私聊][{self.private_name}] ObservationInfo 成功从 ChatObserver 解绑")
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}] 从 ChatObserver 解绑时出错: {e}") logger.error(f"[私聊][{self.private_name}] 从 ChatObserver 解绑时出错: {e}")
finally: finally:
self.chat_observer = None # 无论成功与否都清除引用 self.chat_observer = None # 无论成功与否都清除引用
else: else:
logger.warning(f"[私聊][{self.private_name}] 尝试解绑时 ChatObserver 无效或 handler 未设置") logger.warning(f"[私聊][{self.private_name}] 尝试解绑时 ChatObserver 无效或 handler 未设置")
async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]): async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]):
"""根据收到的新消息更新 ObservationInfo 的状态""" """根据收到的新消息更新 ObservationInfo 的状态"""
message_time = message.get("time") message_time = message.get("time")
@ -209,8 +213,8 @@ class ObservationInfo:
sender_id_str: Optional[str] = str(user_info.user_id) if user_info else None sender_id_str: Optional[str] = str(user_info.user_id) if user_info else None
if not message_time or not message_id: if not message_time or not message_id:
logger.warning(f"[私聊][{self.private_name}] 收到的消息缺少 time 或 message_id: {message}") logger.warning(f"[私聊][{self.private_name}] 收到的消息缺少 time 或 message_id: {message}")
return return
# 更新最后消息时间(所有消息) # 更新最后消息时间(所有消息)
if message_time > (self.last_message_time or 0): if message_time > (self.last_message_time or 0):
@ -225,7 +229,7 @@ class ObservationInfo:
self.last_bot_speak_time = message_time self.last_bot_speak_time = message_time
else: else:
self.last_user_speak_time = message_time self.last_user_speak_time = message_time
self.active_users.add(sender_id_str) # 添加到活跃用户 self.active_users.add(sender_id_str) # 添加到活跃用户
else: else:
logger.warning(f"[私聊][{self.private_name}] 处理消息更新时缺少有效的 UserInfo, message_id: {message_id}") logger.warning(f"[私聊][{self.private_name}] 处理消息更新时缺少有效的 UserInfo, message_id: {message_id}")
@ -237,14 +241,15 @@ class ObservationInfo:
# --- [核心修改] 将新消息添加到未处理列表 --- # --- [核心修改] 将新消息添加到未处理列表 ---
# 检查消息是否已存在于未处理列表中,避免重复添加 # 检查消息是否已存在于未处理列表中,避免重复添加
if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages): if not any(msg.get("message_id") == message_id for msg in self.unprocessed_messages):
# 创建消息的副本以避免修改原始数据(如果需要) # 创建消息的副本以避免修改原始数据(如果需要)
self.unprocessed_messages.append(message.copy()) self.unprocessed_messages.append(message.copy())
self.new_messages_count = len(self.unprocessed_messages) self.new_messages_count = len(self.unprocessed_messages)
logger.debug(f"[私聊][{self.private_name}] 添加新未处理消息 ID: {message_id}, 发送者: {sender_id_str}, 当前未处理数: {self.new_messages_count}") logger.debug(
self.update_changed() f"[私聊][{self.private_name}] 添加新未处理消息 ID: {message_id}, 发送者: {sender_id_str}, 当前未处理数: {self.new_messages_count}"
)
self.update_changed()
else: else:
logger.warning(f"[私聊][{self.private_name}] 尝试重复添加未处理消息 ID: {message_id}") logger.warning(f"[私聊][{self.private_name}] 尝试重复添加未处理消息 ID: {message_id}")
async def remove_unprocessed_message(self, message_id_to_delete: str): async def remove_unprocessed_message(self, message_id_to_delete: str):
"""从 unprocessed_messages 列表中移除指定 ID 的消息""" """从 unprocessed_messages 列表中移除指定 ID 的消息"""
@ -256,11 +261,12 @@ class ObservationInfo:
if new_count < original_count: if new_count < original_count:
self.new_messages_count = new_count self.new_messages_count = new_count
logger.info(f"[私聊][{self.private_name}] 移除了未处理的消息 (ID: {message_id_to_delete}), 当前未处理数: {self.new_messages_count}") logger.info(
f"[私聊][{self.private_name}] 移除了未处理的消息 (ID: {message_id_to_delete}), 当前未处理数: {self.new_messages_count}"
)
self.update_changed() self.update_changed()
else: else:
logger.warning(f"[私聊][{self.private_name}] 尝试移除不存在的未处理消息 ID: {message_id_to_delete}") logger.warning(f"[私聊][{self.private_name}] 尝试移除不存在的未处理消息 ID: {message_id_to_delete}")
async def update_cold_chat_status(self, is_cold: bool, current_time: float): async def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态""" """更新冷场状态"""
@ -274,20 +280,18 @@ class ObservationInfo:
if self.cold_chat_start_time: if self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time self.cold_chat_duration = current_time - self.cold_chat_start_time
logger.info(f"[私聊][{self.private_name}] 结束冷场状态,持续时间: {self.cold_chat_duration:.2f}") logger.info(f"[私聊][{self.private_name}] 结束冷场状态,持续时间: {self.cold_chat_duration:.2f}")
self.cold_chat_start_time = None # 结束冷场,重置开始时间 self.cold_chat_start_time = None # 结束冷场,重置开始时间
self.update_changed() self.update_changed()
# 持续更新冷场时长 # 持续更新冷场时长
if self.is_cold_chat and self.cold_chat_start_time: if self.is_cold_chat and self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time self.cold_chat_duration = current_time - self.cold_chat_start_time
def update_changed(self): def update_changed(self):
"""标记状态已改变""" """标记状态已改变"""
self.changed = True self.changed = True
# 这个标记通常在处理完改变后由外部逻辑重置为 False # 这个标记通常在处理完改变后由外部逻辑重置为 False
# --- [修改点 15] 重命名并修改 clear_unprocessed_messages --- # --- [修改点 15] 重命名并修改 clear_unprocessed_messages ---
async def clear_processed_messages(self, message_ids_to_clear: Set[str]): async def clear_processed_messages(self, message_ids_to_clear: Set[str]):
"""将指定 ID 的未处理消息移入历史记录,并更新相关状态""" """将指定 ID 的未处理消息移入历史记录,并更新相关状态"""
@ -309,7 +313,9 @@ class ObservationInfo:
remaining_messages.append(msg) remaining_messages.append(msg)
if not messages_to_move: if not messages_to_move:
logger.debug(f"[私聊][{self.private_name}] 未找到与 ID 列表 {message_ids_to_clear} 匹配的未处理消息进行清理。") logger.debug(
f"[私聊][{self.private_name}] 未找到与 ID 列表 {message_ids_to_clear} 匹配的未处理消息进行清理。"
)
return return
logger.debug(f"[私聊][{self.private_name}] 准备清理 {cleared_count} 条已处理消息...") logger.debug(f"[私聊][{self.private_name}] 准备清理 {cleared_count} 条已处理消息...")
@ -319,19 +325,19 @@ class ObservationInfo:
self.chat_history.extend(messages_to_move) self.chat_history.extend(messages_to_move)
# 限制历史记录长度 (可选) # 限制历史记录长度 (可选)
max_history_len = 100 # 例如保留最近 100 条 max_history_len = 100 # 例如保留最近 100 条
if len(self.chat_history) > max_history_len: if len(self.chat_history) > max_history_len:
self.chat_history = self.chat_history[-max_history_len:] self.chat_history = self.chat_history[-max_history_len:]
# 更新历史记录字符串 (仅使用最近一部分生成,提高效率) # 更新历史记录字符串 (仅使用最近一部分生成,提高效率)
history_slice_for_str = self.chat_history[-20:] # 例如最近 20 条 history_slice_for_str = self.chat_history[-20:] # 例如最近 20 条
try: try:
self.chat_history_str = await build_readable_messages( self.chat_history_str = await build_readable_messages(
history_slice_for_str, history_slice_for_str,
replace_bot_name=True, replace_bot_name=True,
merge_messages=False, merge_messages=False,
timestamp_mode="relative", timestamp_mode="relative",
read_mark=0.0, # read_mark 可能需要调整或移除 read_mark=0.0, # read_mark 可能需要调整或移除
) )
except Exception as e: except Exception as e:
logger.error(f"[私聊][{self.private_name}] 构建聊天记录字符串时出错: {e}") logger.error(f"[私聊][{self.private_name}] 构建聊天记录字符串时出错: {e}")
@ -342,16 +348,17 @@ class ObservationInfo:
self.new_messages_count = len(self.unprocessed_messages) self.new_messages_count = len(self.unprocessed_messages)
self.chat_history_count = len(self.chat_history) self.chat_history_count = len(self.chat_history)
logger.info(f"[私聊][{self.private_name}] 已清理 {cleared_count} 条消息 (IDs: {message_ids_to_clear}),剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。") logger.info(
f"[私聊][{self.private_name}] 已清理 {cleared_count} 条消息 (IDs: {message_ids_to_clear}),剩余未处理 {self.new_messages_count} 条,当前历史记录 {self.chat_history_count} 条。"
self.update_changed() # 状态改变 )
self.update_changed() # 状态改变
# --- Helper methods (可以根据需要添加) --- # --- Helper methods (可以根据需要添加) ---
def get_active_duration(self) -> float: def get_active_duration(self) -> float:
"""获取当前活跃时长(距离最后一条消息的时间)""" """获取当前活跃时长(距离最后一条消息的时间)"""
if not self.last_message_time: if not self.last_message_time:
return float('inf') # 或返回 0.0,取决于定义 return float("inf") # 或返回 0.0,取决于定义
return time.time() - self.last_message_time return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]: def get_user_response_time(self) -> Optional[float]:
@ -365,4 +372,3 @@ class ObservationInfo:
if not self.last_bot_speak_time: if not self.last_bot_speak_time:
return None return None
return time.time() - self.last_bot_speak_time return time.time() - self.last_bot_speak_time

View File

@ -1,5 +1,5 @@
import time import time
import asyncio # 引入 asyncio import asyncio # 引入 asyncio
import traceback import traceback
from typing import Dict, Optional from typing import Dict, Optional
@ -17,7 +17,7 @@ class PFCManager:
# 会话实例管理 # 会话实例管理
_instances: Dict[str, Conversation] = {} _instances: Dict[str, Conversation] = {}
_initializing: Dict[str, bool] = {} # 用于防止并发初始化同一个 stream_id _initializing: Dict[str, bool] = {} # 用于防止并发初始化同一个 stream_id
@classmethod @classmethod
def get_instance(cls) -> "PFCManager": def get_instance(cls) -> "PFCManager":
@ -33,24 +33,26 @@ class PFCManager:
if self._initializing.get(stream_id, False): if self._initializing.get(stream_id, False):
logger.debug(f"[私聊][{private_name}] 会话实例正在初始化中,请稍候: {stream_id}") logger.debug(f"[私聊][{private_name}] 会话实例正在初始化中,请稍候: {stream_id}")
# 可以选择等待一小段时间或直接返回 None # 可以选择等待一小段时间或直接返回 None
await asyncio.sleep(0.5) # 短暂等待,让初始化有机会完成 await asyncio.sleep(0.5) # 短暂等待,让初始化有机会完成
# 再次检查实例是否存在 # 再次检查实例是否存在
if stream_id in self._instances and self._instances[stream_id]._initialized: if stream_id in self._instances and self._instances[stream_id]._initialized:
logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}") logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}")
return self._instances[stream_id] return self._instances[stream_id]
else: else:
logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。") logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。")
return None # 避免返回未完成的实例 return None # 避免返回未完成的实例
# 检查是否已有活动实例 # 检查是否已有活动实例
if stream_id in self._instances: if stream_id in self._instances:
instance = self._instances[stream_id] instance = self._instances[stream_id]
# 检查忽略状态 # 检查忽略状态
if (hasattr(instance, "ignore_until_timestamp") and if (
instance.ignore_until_timestamp and hasattr(instance, "ignore_until_timestamp")
time.time() < instance.ignore_until_timestamp): and instance.ignore_until_timestamp
and time.time() < instance.ignore_until_timestamp
):
logger.debug(f"[私聊][{private_name}] 会话实例当前处于忽略状态: {stream_id}") logger.debug(f"[私聊][{private_name}] 会话实例当前处于忽略状态: {stream_id}")
return None # 处于忽略状态,不返回实例 return None # 处于忽略状态,不返回实例
# 检查是否已初始化且应继续运行 # 检查是否已初始化且应继续运行
if instance._initialized and instance.should_continue: if instance._initialized and instance.should_continue:
@ -66,16 +68,15 @@ class PFCManager:
if stream_id in self._initializing: if stream_id in self._initializing:
del self._initializing[stream_id] del self._initializing[stream_id]
# --- 创建并初始化新实例 --- # --- 创建并初始化新实例 ---
conversation_instance: Optional[Conversation] = None conversation_instance: Optional[Conversation] = None
try: try:
logger.info(f"[私聊][{private_name}] 创建新的对话实例: {stream_id}") logger.info(f"[私聊][{private_name}] 创建新的对话实例: {stream_id}")
self._initializing[stream_id] = True # 标记开始初始化 self._initializing[stream_id] = True # 标记开始初始化
# 创建实例 # 创建实例
conversation_instance = Conversation(stream_id, private_name) conversation_instance = Conversation(stream_id, private_name)
self._instances[stream_id] = conversation_instance # 立即存入字典 self._instances[stream_id] = conversation_instance # 立即存入字典
# **启动实例初始化** # **启动实例初始化**
# _initialize_conversation 会调用 conversation._initialize() # _initialize_conversation 会调用 conversation._initialize()
@ -84,7 +85,7 @@ class PFCManager:
# --- 关键修复:在初始化成功后调用 start() --- # --- 关键修复:在初始化成功后调用 start() ---
if conversation_instance._initialized and conversation_instance.should_continue: if conversation_instance._initialized and conversation_instance.should_continue:
logger.info(f"[私聊][{private_name}] 初始化成功,调用 conversation.start() 启动主循环...") logger.info(f"[私聊][{private_name}] 初始化成功,调用 conversation.start() 启动主循环...")
await conversation_instance.start() # 确保调用 start 方法 await conversation_instance.start() # 确保调用 start 方法
else: else:
# 如果 _initialize_conversation 内部初始化失败 # 如果 _initialize_conversation 内部初始化失败
logger.error(f"[私聊][{private_name}] 初始化未成功完成,无法启动实例 {stream_id}") logger.error(f"[私聊][{private_name}] 初始化未成功完成,无法启动实例 {stream_id}")
@ -92,7 +93,7 @@ class PFCManager:
await self._cleanup_conversation(conversation_instance) await self._cleanup_conversation(conversation_instance)
if stream_id in self._instances: if stream_id in self._instances:
del self._instances[stream_id] del self._instances[stream_id]
conversation_instance = None # 返回 None 表示失败 conversation_instance = None # 返回 None 表示失败
except Exception as e: except Exception as e:
logger.error(f"[私聊][{private_name}] 创建或启动会话实例时发生严重错误: {stream_id}, 错误: {e}") logger.error(f"[私聊][{private_name}] 创建或启动会话实例时发生严重错误: {stream_id}, 错误: {e}")
@ -102,7 +103,7 @@ class PFCManager:
await self._cleanup_conversation(conversation_instance) await self._cleanup_conversation(conversation_instance)
if stream_id in self._instances: if stream_id in self._instances:
del self._instances[stream_id] del self._instances[stream_id]
conversation_instance = None # 返回 None conversation_instance = None # 返回 None
finally: finally:
# 确保初始化标记被清除 # 确保初始化标记被清除
@ -117,22 +118,27 @@ class PFCManager:
private_name = conversation.private_name private_name = conversation.private_name
try: try:
logger.info(f"[私聊][{private_name}] 管理器开始调用 conversation._initialize(): {stream_id}") logger.info(f"[私聊][{private_name}] 管理器开始调用 conversation._initialize(): {stream_id}")
await conversation._initialize() # 调用实例自身的初始化方法 await conversation._initialize() # 调用实例自身的初始化方法
# 注意:初始化成功与否由 conversation._initialized 和 conversation.should_continue 标志决定 # 注意:初始化成功与否由 conversation._initialized 和 conversation.should_continue 标志决定
if conversation._initialized: if conversation._initialized:
logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}") logger.info(
f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}"
)
else: else:
logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}") logger.warning(
f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}"
)
except Exception as e: except Exception as e:
# _initialize 内部应该处理自己的异常,但这里也捕获以防万一 # _initialize 内部应该处理自己的异常,但这里也捕获以防万一
logger.error(f"[私聊][{private_name}] 调用 conversation._initialize() 时发生未捕获错误: {stream_id}, 错误: {e}") logger.error(
f"[私聊][{private_name}] 调用 conversation._initialize() 时发生未捕获错误: {stream_id}, 错误: {e}"
)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
# 确保实例状态反映失败 # 确保实例状态反映失败
conversation._initialized = False conversation._initialized = False
conversation.should_continue = False conversation.should_continue = False
async def _cleanup_conversation(self, conversation: Conversation): async def _cleanup_conversation(self, conversation: Conversation):
"""清理会话实例的资源""" """清理会话实例的资源"""
if not conversation: if not conversation:
@ -142,14 +148,14 @@ class PFCManager:
logger.info(f"[私聊][{private_name}] 开始清理会话实例资源: {stream_id}") logger.info(f"[私聊][{private_name}] 开始清理会话实例资源: {stream_id}")
try: try:
# 调用 conversation 的 stop 方法来停止其内部组件 # 调用 conversation 的 stop 方法来停止其内部组件
if hasattr(conversation, 'stop') and callable(conversation.stop): if hasattr(conversation, "stop") and callable(conversation.stop):
await conversation.stop() # stop 方法应处理内部组件的停止 await conversation.stop() # stop 方法应处理内部组件的停止
else: else:
logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。") logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。")
# 尝试手动停止已知组件 (作为后备) # 尝试手动停止已知组件 (作为后备)
if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter: if hasattr(conversation, "idle_conversation_starter") and conversation.idle_conversation_starter:
conversation.idle_conversation_starter.stop() conversation.idle_conversation_starter.stop()
if hasattr(conversation, 'observation_info') and conversation.observation_info: if hasattr(conversation, "observation_info") and conversation.observation_info:
conversation.observation_info.unbind_from_chat_observer() conversation.observation_info.unbind_from_chat_observer()
# ChatObserver 是单例,不在此处停止 # ChatObserver 是单例,不在此处停止
@ -163,12 +169,14 @@ class PFCManager:
instance = self._instances.get(stream_id) instance = self._instances.get(stream_id)
if instance and instance._initialized and instance.should_continue: if instance and instance._initialized and instance.should_continue:
# 检查忽略状态 # 检查忽略状态
if (hasattr(instance, "ignore_until_timestamp") and if (
instance.ignore_until_timestamp and hasattr(instance, "ignore_until_timestamp")
time.time() < instance.ignore_until_timestamp): and instance.ignore_until_timestamp
return None # 忽略期间不返回 and time.time() < instance.ignore_until_timestamp
):
return None # 忽略期间不返回
return instance return instance
return None # 不存在或无效则返回 None return None # 不存在或无效则返回 None
async def remove_conversation(self, stream_id: str): async def remove_conversation(self, stream_id: str):
"""移除并清理会话实例""" """移除并清理会话实例"""
@ -188,4 +196,3 @@ class PFCManager:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
else: else:
logger.warning(f"[管理器] 尝试移除不存在的会话实例: {stream_id}") logger.warning(f"[管理器] 尝试移除不存在的会话实例: {stream_id}")

View File

@ -18,8 +18,7 @@ class ConversationState(Enum):
ENDED = "结束" ENDED = "结束"
JUDGING = "判断" JUDGING = "判断"
IGNORED = "屏蔽" IGNORED = "屏蔽"
ERROR = "错误" # <--- 添加 ERROR 状态 ERROR = "错误" # <--- 添加 ERROR 状态
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]