pull/937/head
Bakadax 2025-05-06 10:27:51 +08:00
parent 3775e3781b
commit e74ee67376
4 changed files with 305 additions and 384 deletions

View File

@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
# File: conversation.py
import time import time
import asyncio import asyncio
import datetime import datetime
import traceback import traceback
from typing import Dict, Any, Optional, Set, List from typing import Dict, Any, Optional, Set, List
# ... (其他 imports 保持不变) ...
from src.common.logger_manager import get_logger from src.common.logger_manager import get_logger
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from maim_message import UserInfo from maim_message import UserInfo
@ -11,7 +14,7 @@ from src.plugins.chat.chat_stream import chat_manager, ChatStream
from ..chat.message import Message # 假设 Message 类在这里 from ..chat.message import Message # 假设 Message 类在这里
from ...config.config import global_config # 导入全局配置 from ...config.config import global_config # 导入全局配置
from .pfc_types import ConversationState from .pfc_types import ConversationState # 导入更新后的 pfc_types
from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py from .pfc import GoalAnalyzer # 假设 GoalAnalyzer 在 pfc.py
from .chat_observer import ChatObserver from .chat_observer import ChatObserver
from .message_sender import DirectMessageSender from .message_sender import DirectMessageSender
@ -22,6 +25,7 @@ from .reply_generator import ReplyGenerator
from .idle_conversation_starter import IdleConversationStarter from .idle_conversation_starter import IdleConversationStarter
from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里 from .pfc_KnowledgeFetcher import KnowledgeFetcher # 假设 KnowledgeFetcher 在这里
from .waiter import Waiter from .waiter import Waiter
from .reply_checker import ReplyChecker # <--- 确保 ReplyChecker 被导入
from rich.traceback import install from rich.traceback import install
install(extra_lines=3) install(extra_lines=3)
@ -31,195 +35,103 @@ logger = get_logger("pfc_conversation")
class Conversation: class Conversation:
"""对话类,负责管理单个对话的状态和行为""" """对话类,负责管理单个对话的状态和行为"""
# --- __init__, _initialize, _load_initial_history, start, stop, _plan_and_action_loop, _convert_to_message (保持不变) ---
def __init__(self, stream_id: str, private_name: str): def __init__(self, stream_id: str, private_name: str):
"""初始化对话实例""" """初始化对话实例"""
self.stream_id = stream_id self.stream_id = stream_id; self.private_name = private_name; self.state = ConversationState.INIT; self.should_continue = False; self.ignore_until_timestamp: Optional[float] = None; self.generated_reply = ""; self.chat_stream: Optional[ChatStream] = None
self.private_name = private_name self.action_planner: Optional[ActionPlanner] = None; self.goal_analyzer: Optional[GoalAnalyzer] = None; self.reply_generator: Optional[ReplyGenerator] = None; self.knowledge_fetcher: Optional[KnowledgeFetcher] = None; self.waiter: Optional[Waiter] = None; self.direct_sender: Optional[DirectMessageSender] = None; self.idle_conversation_starter: Optional[IdleConversationStarter] = None; self.chat_observer: Optional[ChatObserver] = None; self.observation_info: Optional[ObservationInfo] = None; self.conversation_info: Optional[ConversationInfo] = None; self.reply_checker: Optional[ReplyChecker] = None
self.state = ConversationState.INIT self._initializing = False; self._initialized = False
self.should_continue = False
self.ignore_until_timestamp: Optional[float] = None
self.generated_reply = ""
self.chat_stream: Optional[ChatStream] = None
# 初始化组件为 None
self.action_planner: Optional[ActionPlanner] = None
self.goal_analyzer: Optional[GoalAnalyzer] = None
self.reply_generator: Optional[ReplyGenerator] = None
self.knowledge_fetcher: Optional[KnowledgeFetcher] = None
self.waiter: Optional[Waiter] = None
self.direct_sender: Optional[DirectMessageSender] = None
self.idle_conversation_starter: Optional[IdleConversationStarter] = None
self.chat_observer: Optional[ChatObserver] = None
self.observation_info: Optional[ObservationInfo] = None
self.conversation_info: Optional[ConversationInfo] = None # 使用 ConversationInfo
self._initializing = False
self._initialized = False
# 在初始化时获取机器人QQ号字符串避免重复转换
self.bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None self.bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
if not self.bot_qq_str: if not self.bot_qq_str: logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ IDPFC 可能无法正常工作。")
logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ IDPFC 可能无法正常工作。")
async def _initialize(self): async def _initialize(self):
"""异步初始化对话实例及其所有组件""" """异步初始化对话实例及其所有组件"""
if self._initialized or self._initializing: logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。"); return
if self._initialized or self._initializing: self._initializing = True; logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}")
logger.warning(f"[私聊][{self.private_name}] 尝试重复初始化或正在初始化中。")
return
self._initializing = True
logger.info(f"[私聊][{self.private_name}] 开始初始化对话实例: {self.stream_id}")
try: try:
self.action_planner = ActionPlanner(self.stream_id, self.private_name) self.action_planner = ActionPlanner(self.stream_id, self.private_name); self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name); self.reply_generator = ReplyGenerator(self.stream_id, self.private_name); self.knowledge_fetcher = KnowledgeFetcher(self.private_name); self.waiter = Waiter(self.stream_id, self.private_name); self.direct_sender = DirectMessageSender(self.private_name); self.reply_checker = ReplyChecker(self.stream_id, self.private_name)
self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name) self.chat_stream = chat_manager.get_stream(self.stream_id);
self.reply_generator = ReplyGenerator(self.stream_id, self.private_name) if not self.chat_stream: raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream")
self.knowledge_fetcher = KnowledgeFetcher(self.private_name)
self.waiter = Waiter(self.stream_id, self.private_name)
self.direct_sender = DirectMessageSender(self.private_name)
self.chat_stream = chat_manager.get_stream(self.stream_id)
if not self.chat_stream:
raise ValueError(f"无法获取 stream_id {self.stream_id} 的 ChatStream")
self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name)
self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name); self.observation_info = ObservationInfo(self.private_name)
self.observation_info = ObservationInfo(self.private_name) if not self.observation_info.bot_id: logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id尝试手动设置。"); self.observation_info.bot_id = self.bot_qq_str
if not self.observation_info.bot_id:
logger.warning(f"[私聊][{self.private_name}] ObservationInfo 未能自动获取 bot_id尝试手动设置。")
self.observation_info.bot_id = self.bot_qq_str
self.conversation_info = ConversationInfo() self.conversation_info = ConversationInfo()
self.observation_info.bind_to_chat_observer(self.chat_observer) self.observation_info.bind_to_chat_observer(self.chat_observer); await self._load_initial_history(); self.chat_observer.start()
await self._load_initial_history() if self.idle_conversation_starter: self.idle_conversation_starter.start(); logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动")
self.chat_observer.start() self._initialized = True; self.should_continue = True; self.state = ConversationState.ANALYZING; logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。")
if self.idle_conversation_starter: except Exception as e: logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}\n{traceback.format_exc()}"); self.should_continue = False; self._initialized = False; await self.stop(); raise
self.idle_conversation_starter.start() finally: self._initializing = False
logger.info(f"[私聊][{self.private_name}] 空闲对话检测器已启动")
self._initialized = True
self.should_continue = True
self.state = ConversationState.ANALYZING
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 初始化完成。")
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 初始化对话实例失败: {e}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
self.should_continue = False
await self.stop()
raise
finally:
self._initializing = False
async def _load_initial_history(self): async def _load_initial_history(self):
"""加载初始聊天记录""" """加载初始聊天记录"""
if not self.observation_info: return if not self.observation_info: return
try: try:
logger.info(f"[私聊][{self.private_name}] 为 {self.stream_id} 加载初始聊天记录...") logger.info(f"[私聊][{self.private_name}] 为 {self.stream_id} 加载初始聊天记录...")
initial_messages = get_raw_msg_before_timestamp_with_chat( initial_messages = get_raw_msg_before_timestamp_with_chat(chat_id=self.stream_id, timestamp=time.time(), limit=30,)
chat_id=self.stream_id, timestamp=time.time(), limit=30,
)
if initial_messages: if initial_messages:
self.observation_info.chat_history = initial_messages self.observation_info.chat_history = initial_messages; self.observation_info.chat_history_count = len(initial_messages)
self.observation_info.chat_history_count = len(initial_messages) last_msg = initial_messages[-1]; self.observation_info.last_message_time = last_msg.get("time"); self.observation_info.last_message_id = last_msg.get("message_id")
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get("time")
self.observation_info.last_message_id = last_msg.get("message_id")
last_user_info_dict = last_msg.get("user_info", {}) last_user_info_dict = last_msg.get("user_info", {})
if isinstance(last_user_info_dict, dict): if isinstance(last_user_info_dict, dict):
try: try: last_user_info = UserInfo.from_dict(last_user_info_dict); self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None
last_user_info = UserInfo.from_dict(last_user_info_dict) except Exception as e: logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}"); self.observation_info.last_message_sender = None
self.observation_info.last_message_sender = str(last_user_info.user_id) if last_user_info else None
except Exception as e:
logger.warning(f"[私聊][{self.private_name}] 解析最后一条消息的用户信息时出错: {e}")
self.observation_info.last_message_sender = None
else: self.observation_info.last_message_sender = None else: self.observation_info.last_message_sender = None
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
history_slice_for_str = initial_messages[-20:] history_slice_for_str = initial_messages[-20:]
self.observation_info.chat_history_str = await build_readable_messages( self.observation_info.chat_history_str = await build_readable_messages(history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
history_slice_for_str, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0
)
if self.chat_observer: self.chat_observer.last_message_time = self.observation_info.last_message_time if self.chat_observer: self.chat_observer.last_message_time = self.observation_info.last_message_time
if self.idle_conversation_starter and self.observation_info.last_message_time: if self.idle_conversation_starter and self.observation_info.last_message_time: await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time)
await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time)
logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}") logger.info(f"[私聊][{self.private_name}] 成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}")
else: else: logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。"); self.observation_info.chat_history_str = "还没有聊天记录。"
logger.info(f"[私聊][{self.private_name}] 没有找到初始聊天记录。")
self.observation_info.chat_history_str = "还没有聊天记录。"
except Exception as load_err: except Exception as load_err:
logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}") logger.error(f"[私聊][{self.private_name}] 加载初始聊天记录时出错: {load_err}");
if self.observation_info: self.observation_info.chat_history_str = "[加载聊天记录出错]" if self.observation_info: self.observation_info.chat_history_str = "[加载聊天记录出错]"
async def start(self): async def start(self):
"""开始对话流程""" """开始对话流程 (增强检查)"""
if not self._initialized: if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 对话实例未初始化,无法启动。") logger.warning(f"[私聊][{self.private_name}] 对话实例未初始化,尝试初始化...")
try: try:
await self._initialize() await self._initialize();
if not self._initialized: return if not self._initialized: logger.error(f"[私聊][{self.private_name}] 初始化失败,无法启动规划循环。"); return
except Exception: return except Exception as init_err: logger.error(f"[私聊][{self.private_name}] 初始化过程中发生未捕获错误: {init_err},无法启动。"); return
if not self.should_continue: if not self.should_continue: logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续 (可能由于初始化失败或已被停止),无法启动规划循环。"); return
logger.warning(f"[私聊][{self.private_name}] 对话实例已被标记为不应继续,无法启动。") logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...")
return try: logger.debug(f"[私聊][{self.private_name}] 正在创建 _plan_and_action_loop 任务..."); loop_task = asyncio.create_task(self._plan_and_action_loop()); logger.info(f"[私聊][{self.private_name}] 规划循环任务已创建。")
logger.info(f"[私聊][{self.private_name}] 对话系统启动,开始规划循环...") except Exception as task_err: logger.error(f"[私聊][{self.private_name}] 创建规划循环任务时出错: {task_err}"); await self.stop()
asyncio.create_task(self._plan_and_action_loop())
async def stop(self): async def stop(self):
"""停止对话实例并清理资源""" """停止对话实例并清理资源"""
logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}"); self.should_continue = False
logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}")
self.should_continue = False
if self.idle_conversation_starter: self.idle_conversation_starter.stop() if self.idle_conversation_starter: self.idle_conversation_starter.stop()
if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer() if self.observation_info and self.chat_observer: self.observation_info.unbind_from_chat_observer()
self._initialized = False self._initialized = False; logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。")
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。")
async def _plan_and_action_loop(self): async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块 - 实现精细化中断逻辑""" """思考步PFC核心循环模块 - 实现精细化中断逻辑"""
logger.info(f"[私聊][{self.private_name}] 进入 _plan_and_action_loop 循环。")
if not self._initialized: if not self._initialized: logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环。"); return
logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下运行规划循环。")
return
while self.should_continue: while self.should_continue:
current_loop_start_time = time.time() loop_iter_start_time = time.time(); logger.debug(f"[私聊][{self.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})")
# --- 忽略逻辑 --- if self.ignore_until_timestamp and loop_iter_start_time < self.ignore_until_timestamp:
if self.ignore_until_timestamp and current_loop_start_time < self.ignore_until_timestamp: if self.idle_conversation_starter and self.idle_conversation_starter._running: self.idle_conversation_starter.stop(); logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测")
if self.idle_conversation_starter and self.idle_conversation_starter._running: sleep_duration = min(30, self.ignore_until_timestamp - loop_iter_start_time); await asyncio.sleep(sleep_duration); continue
self.idle_conversation_starter.stop(); logger.debug(f"[私聊][{self.private_name}] 对话被暂时忽略,暂停空闲对话检测") elif self.ignore_until_timestamp and loop_iter_start_time >= self.ignore_until_timestamp: logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。"); self.ignore_until_timestamp = None; await self.stop(); continue
sleep_duration = min(30, self.ignore_until_timestamp - current_loop_start_time)
await asyncio.sleep(sleep_duration)
continue
elif self.ignore_until_timestamp and current_loop_start_time >= self.ignore_until_timestamp:
logger.info(f"[私聊][{self.private_name}] 忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None; await self.stop(); continue
else: else:
if self.idle_conversation_starter and not self.idle_conversation_starter._running: if self.idle_conversation_starter and not self.idle_conversation_starter._running: self.idle_conversation_starter.start(); logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测")
self.idle_conversation_starter.start(); logger.debug(f"[私聊][{self.private_name}] 恢复空闲对话检测")
# --- 核心规划与行动逻辑 ---
try: try:
if not all([self.action_planner, self.observation_info, self.conversation_info]): if not all([self.action_planner, self.observation_info, self.conversation_info]): logger.error(f"[私聊][{self.private_name}] 核心组件未初始化无法继续规划循环。将等待5秒后重试..."); await asyncio.sleep(5); continue
logger.error(f"[私聊][{self.private_name}] 核心组件未初始化,无法继续规划循环。"); await asyncio.sleep(5); continue planning_start_time = time.time(); logger.debug(f"[私聊][{self.private_name}] --- 开始规划 ({planning_start_time:.2f}) ---"); self.conversation_info.other_new_messages_during_planning_count = 0
# --- 1. 记录规划开始时间 --- logger.debug(f"[私聊][{self.private_name}] 调用 ActionPlanner.plan..."); action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action)
planning_start_time = time.time() planning_duration = time.time() - planning_start_time; logger.debug(f"[私聊][{self.private_name}] ActionPlanner.plan 完成 (耗时: {planning_duration:.3f} 秒),初步规划动作: {action}")
logger.debug(f"[私聊][{self.private_name}] --- 开始新一轮规划 ({planning_start_time:.2f}) ---") current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', []); new_messages_during_planning: List[Dict[str, Any]] = []; other_new_messages_during_planning: List[Dict[str, Any]] = []
self.conversation_info.other_new_messages_during_planning_count = 0
# --- 2. 调用 Action Planner ---
action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action)
planning_duration = time.time() - planning_start_time
logger.debug(f"[私聊][{self.private_name}] 规划耗时: {planning_duration:.3f} 秒,初步规划动作: {action}")
# --- 3. 检查规划期间的新消息 ---
current_unprocessed_messages = getattr(self.observation_info, 'unprocessed_messages', [])
new_messages_during_planning: List[Dict[str, Any]] = []
other_new_messages_during_planning: List[Dict[str, Any]] = []
for msg in current_unprocessed_messages: for msg in current_unprocessed_messages:
msg_time = msg.get('time') msg_time = msg.get('time'); sender_id = msg.get("user_info", {}).get("user_id")
sender_id = msg.get("user_info", {}).get("user_id")
if msg_time and msg_time >= planning_start_time: if msg_time and msg_time >= planning_start_time:
new_messages_during_planning.append(msg) new_messages_during_planning.append(msg);
if sender_id != self.bot_qq_str: other_new_messages_during_planning.append(msg) if sender_id != self.bot_qq_str:
new_msg_count = len(new_messages_during_planning); other_new_msg_count = len(other_new_messages_during_planning) other_new_messages_during_planning.append(msg)
logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}") new_msg_count = len(new_messages_during_planning); other_new_msg_count = len(other_new_messages_during_planning); logger.debug(f"[私聊][{self.private_name}] 规划期间收到新消息总数: {new_msg_count}, 来自他人: {other_new_msg_count}")
# --- 4. 执行中断检查 ---
should_interrupt = False; interrupt_reason = "" should_interrupt = False; interrupt_reason = ""
if action in ["wait", "listening"]: if action in ["wait", "listening"]:
if new_msg_count > 0: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}") if new_msg_count > 0: should_interrupt = True; interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"; logger.info(f"[私聊][{self.private_name}] 中断 '{action}',原因: {interrupt_reason}")
@ -230,13 +142,9 @@ class Conversation:
logger.info(f"[私聊][{self.private_name}] 执行中断,重新规划...") logger.info(f"[私聊][{self.private_name}] 执行中断,重新规划...")
cancel_record = {"action": action, "plan_reason": reason, "status": "cancelled_due_to_new_messages", "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": interrupt_reason} cancel_record = {"action": action, "plan_reason": reason, "status": "cancelled_due_to_new_messages", "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": interrupt_reason}
if not hasattr(self.conversation_info, "done_action"): self.conversation_info.done_action = [] if not hasattr(self.conversation_info, "done_action"): self.conversation_info.done_action = []
self.conversation_info.done_action.append(cancel_record) self.conversation_info.done_action.append(cancel_record); self.conversation_info.last_successful_reply_action = None; self.state = ConversationState.ANALYZING; await asyncio.sleep(0.1); continue
self.conversation_info.last_successful_reply_action = None; self.state = ConversationState.ANALYZING; await asyncio.sleep(0.1); continue logger.debug(f"[私聊][{self.private_name}] 未中断,调用 _handle_action 执行动作 '{action}'..."); self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count
# --- 5. 如果未中断,存储状态并执行动作 --- await self._handle_action(action, reason, self.observation_info, self.conversation_info); logger.debug(f"[私聊][{self.private_name}] _handle_action 完成。")
logger.debug(f"[私聊][{self.private_name}] 未中断,继续执行动作 '{action}'")
self.conversation_info.other_new_messages_during_planning_count = other_new_msg_count
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# --- 6. 检查是否需要结束对话 ---
goal_ended = False goal_ended = False
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
last_goal_item = self.conversation_info.goal_list[-1]; current_goal = None last_goal_item = self.conversation_info.goal_list[-1]; current_goal = None
@ -245,26 +153,23 @@ class Conversation:
if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True
last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} last_action_record = self.conversation_info.done_action[-1] if self.conversation_info.done_action else {}
action_ended = last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("status") == "done" action_ended = last_action_record.get("action") in ["end_conversation", "say_goodbye"] and last_action_record.get("status") == "done"
if goal_ended or action_ended: if goal_ended or action_ended: logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"); await self.stop(); continue
logger.info(f"[私聊][{self.private_name}] 检测到结束条件 (目标结束: {goal_ended}, 动作结束: {action_ended}),停止循环。"); await self.stop(); continue
except asyncio.CancelledError: logger.info(f"[私聊][{self.private_name}] PFC 主循环被取消。"); await self.stop(); break except asyncio.CancelledError: logger.info(f"[私聊][{self.private_name}] PFC 主循环被取消。"); await self.stop(); break
except Exception as loop_err: logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; await asyncio.sleep(5) except Exception as loop_err: logger.error(f"[私聊][{self.private_name}] PFC 主循环出错: {loop_err}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; await asyncio.sleep(5) # 使用修正后的 ERROR 状态
# 控制循环频率 loop_duration = time.time() - loop_iter_start_time; min_loop_interval = 0.1; logger.debug(f"[私聊][{self.private_name}] 循环迭代耗时: {loop_duration:.3f} 秒。")
loop_duration = time.time() - current_loop_start_time; min_loop_interval = 0.1
if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration) if loop_duration < min_loop_interval: await asyncio.sleep(min_loop_interval - loop_duration)
logger.info(f"[私聊][{self.private_name}] PFC 循环结束 for stream_id: {self.stream_id}") logger.info(f"[私聊][{self.private_name}] PFC 循环已退出 for stream_id: {self.stream_id}")
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]: def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]:
"""将消息字典转换为Message对象""" """将消息字典转换为Message对象 (保持不变)"""
# (代码同 v6_debug 版本)
try: try:
chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id) chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id)
if not chat_stream_to_use: logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"); return None if not chat_stream_to_use: logger.error(f"[私聊][{self.private_name}] 无法确定 ChatStream for stream_id {self.stream_id},无法转换消息。"); return None
user_info_dict = msg_dict.get("user_info", {}); user_info: Optional[UserInfo] = None user_info_dict = msg_dict.get("user_info", {}); user_info: Optional[UserInfo] = None
if isinstance(user_info_dict, dict): if isinstance(user_info_dict, dict):
try: user_info = UserInfo.from_dict(user_info_dict) try: user_info = UserInfo.from_dict(user_info_dict)
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}") except Exception as e: logger.warning(f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}")
if not user_info: logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}"); return None if not user_info: logger.warning(f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}"); return None
return Message(message_id=msg_dict.get("message_id", f"gen_{time.time()}"), chat_stream=chat_stream_to_use, time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", "")) return Message(message_id=msg_dict.get("message_id", f"gen_{time.time()}"), chat_stream=chat_stream_to_use, time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", ""))
except Exception as e: logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}\n{traceback.format_exc()}"); return None except Exception as e: logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}\n{traceback.format_exc()}"); return None
@ -277,10 +182,10 @@ class Conversation:
observation_info: ObservationInfo, observation_info: ObservationInfo,
conversation_info: ConversationInfo conversation_info: ConversationInfo
): ):
"""处理规划的行动 - 实现精细化后续状态设置""" """处理规划的行动 - 重新加入 ReplyChecker"""
if not self._initialized: if not self._initialized:
logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'") logger.error(f"[私聊][{self.private_name}] 尝试在未初始化状态下处理动作 '{action}'")
return return
logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}") logger.info(f"[私聊][{self.private_name}] 开始处理动作: {action}, 原因: {reason}")
action_start_time = time.time() action_start_time = time.time()
@ -294,13 +199,14 @@ class Conversation:
conversation_info.done_action.append(current_action_record) conversation_info.done_action.append(current_action_record)
action_index = len(conversation_info.done_action) - 1 action_index = len(conversation_info.done_action) - 1
action_successful = False # 初始化动作成功状态 action_successful = False
final_status = "recall" # 默认失败状态 final_status = "recall"
final_reason = "动作未成功执行" # 默认失败原因 final_reason = "动作未成功执行"
need_replan_from_checker = False
try: try:
# --- 根据不同的 action 执行 --- # --- 根据不同的 action 执行 ---
if action in ["direct_reply", "send_new_message", "say_goodbye"]: if action in ["direct_reply", "send_new_message"]:
# --- 生成回复逻辑 --- # --- 生成回复逻辑 ---
self.state = ConversationState.GENERATING self.state = ConversationState.GENERATING
if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化") if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化")
@ -308,87 +214,96 @@ class Conversation:
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'") logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'")
if not generated_content or generated_content.startswith("抱歉"): if not generated_content or generated_content.startswith("抱歉"):
logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。") logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,标记失败。")
final_reason = "生成内容无效" final_reason = "生成内容无效"; final_status = "recall"
if action == "say_goodbye": final_status = "done"; self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。") conversation_info.last_successful_reply_action = None
else: final_status = "recall"; conversation_info.last_successful_reply_action = None
else: else:
# --- 发送回复逻辑 --- # --- [核心修复] 调用 ReplyChecker ---
self.generated_reply = generated_content self.state = ConversationState.CHECKING
timestamp_before_sending = time.time() if not self.reply_checker: raise RuntimeError("ReplyChecker 未初始化")
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}")
self.state = ConversationState.SENDING
send_success = await self._send_reply()
send_end_time = time.time()
if send_success: current_goal_str = ""
action_successful = True # <--- 标记动作成功 if conversation_info.goal_list:
# final_status 和 final_reason 在 finally 块中根据 action_successful 设置 goal_item = conversation_info.goal_list[-1]
logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复.") if isinstance(goal_item, dict): current_goal_str = goal_item.get('goal', '')
if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time) elif isinstance(goal_item, str): current_goal_str = goal_item
chat_history_for_check = getattr(observation_info, 'chat_history', [])
# chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') # <--- 获取文本形式的历史记录
# --- 修正:直接使用 observation_info 中的 chat_history_str ---
chat_history_text_for_check = getattr(observation_info, 'chat_history_str', '')
retry_count = 0
# --- 清理已处理消息 --- logger.debug(f"[私聊][{self.private_name}] 调用 ReplyChecker 检查回复...")
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) is_suitable, check_reason, need_replan_from_checker = await self.reply_checker.check(
message_ids_to_clear: Set[str] = set() reply=generated_content,
for msg in current_unprocessed_messages: goal=current_goal_str,
msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id") chat_history=chat_history_for_check, # 传递列表形式的历史记录
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id) chat_history_text=chat_history_text_for_check, # <--- 修正参数名
if message_ids_to_clear: logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"); await observation_info.clear_processed_messages(message_ids_to_clear) retry_count=retry_count
else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。") )
logger.info(f"[私聊][{self.private_name}] ReplyChecker 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重规划={need_replan_from_checker}")
# --- 决定下一轮规划类型 --- if not is_suitable or need_replan_from_checker:
other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0) conversation_info.last_reply_rejection_reason = check_reason
if other_new_msg_count_during_planning > 0: conversation_info.last_rejected_reply_content = generated_content
logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。") final_reason = f"回复检查不通过: {check_reason}"; final_status = "recall"
conversation_info.last_successful_reply_action = None
else:
logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。")
conversation_info.last_successful_reply_action = action
# 清除上次拒绝信息
conversation_info.last_reply_rejection_reason = None; conversation_info.last_rejected_reply_content = None
if action == "say_goodbye": self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。")
else:
# 发送失败
logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。")
final_status = "recall"; final_reason = "发送回复时失败" # 发送失败直接设置状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
if action == "say_goodbye": self.should_continue = True logger.warning(f"[私聊][{self.private_name}] 动作 '{action}' 因回复检查失败而被拒绝。")
else: # 检查通过
conversation_info.last_reply_rejection_reason = None; conversation_info.last_rejected_reply_content = None
# --- 发送回复逻辑 ---
self.generated_reply = generated_content; timestamp_before_sending = time.time()
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 回复检查通过,记录发送前时间戳: {timestamp_before_sending:.2f}"); self.state = ConversationState.SENDING
send_success = await self._send_reply(); send_end_time = time.time()
if send_success:
action_successful = True; logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 成功发送回复。")
if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time)
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []); message_ids_to_clear: Set[str] = set()
for msg in current_unprocessed_messages:
msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id")
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id)
if message_ids_to_clear: logger.debug(f"[私聊][{self.private_name}] 准备清理 {len(message_ids_to_clear)} 条发送前(他人)消息: {message_ids_to_clear}"); await observation_info.clear_processed_messages(message_ids_to_clear)
else: logger.debug(f"[私聊][{self.private_name}] 没有需要清理的发送前(他人)消息。")
other_new_msg_count_during_planning = getattr(conversation_info, 'other_new_messages_during_planning_count', 0)
if other_new_msg_count_during_planning > 0: logger.info(f"[私聊][{self.private_name}] 因规划期间收到 {other_new_msg_count_during_planning} 条他人新消息,下一轮强制使用【初始回复】逻辑。"); conversation_info.last_successful_reply_action = None
else: logger.info(f"[私聊][{self.private_name}] 规划期间无他人新消息,下一轮【允许】使用追问逻辑 (基于 '{action}')。"); conversation_info.last_successful_reply_action = action
else: # 发送失败
logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送回复失败。"); final_status = "recall"; final_reason = "发送回复时失败"; conversation_info.last_successful_reply_action = None
# --- 其他动作处理 --- elif action == "say_goodbye":
self.state = ConversationState.GENERATING
if not self.reply_generator: raise RuntimeError("ReplyGenerator 未初始化")
generated_content = await self.reply_generator.generate(observation_info, conversation_info, action_type=action); logger.info(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容: '{generated_content[:100]}...'")
if not generated_content or generated_content.startswith("抱歉"):
logger.warning(f"[私聊][{self.private_name}] 动作 '{action}': 生成内容为空或为错误提示,取消发送。"); final_reason = "生成内容无效"; final_status = "done"; self.should_continue = False; logger.info(f"[私聊][{self.private_name}] 告别语生成失败,仍按计划结束对话。")
else:
self.generated_reply = generated_content; timestamp_before_sending = time.time(); logger.debug(f"[私聊][{self.private_name}] 动作 '{action}': 记录发送前时间戳: {timestamp_before_sending:.2f}"); self.state = ConversationState.SENDING
send_success = await self._send_reply(); send_end_time = time.time()
if send_success:
action_successful = True; logger.info(f"[私聊][{self.private_name}] 成功发送告别语,即将停止对话实例。")
if self.idle_conversation_starter: await self.idle_conversation_starter.update_last_message_time(send_end_time)
current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []); message_ids_to_clear: Set[str] = set()
for msg in current_unprocessed_messages:
msg_time = msg.get('time'); msg_id = msg.get('message_id'); sender_id = msg.get("user_info", {}).get("user_id")
if msg_id and msg_time and sender_id != self.bot_qq_str and msg_time < timestamp_before_sending: message_ids_to_clear.add(msg_id)
if message_ids_to_clear: await observation_info.clear_processed_messages(message_ids_to_clear)
self.should_continue = False
else: logger.error(f"[私聊][{self.private_name}] 动作 '{action}': 发送告别语失败。"); final_status = "recall"; final_reason = "发送告别语失败"; self.should_continue = True
# --- 其他动作处理 (保持不变) ---
elif action == "rethink_goal": elif action == "rethink_goal":
self.state = ConversationState.RETHINKING self.state = ConversationState.RETHINKING;
if not self.goal_analyzer: raise RuntimeError("GoalAnalyzer 未初始化") if not self.goal_analyzer:
await self.goal_analyzer.analyze_goal(conversation_info, observation_info) raise RuntimeError("GoalAnalyzer 未初始化"); await self.goal_analyzer.analyze_goal(conversation_info, observation_info); action_successful = True
action_successful = True # <--- 标记动作成功
elif action == "listening": elif action == "listening":
self.state = ConversationState.LISTENING self.state = ConversationState.LISTENING;
if not self.waiter: raise RuntimeError("Waiter 未初始化") if not self.waiter: raise RuntimeError("Waiter 未初始化"); logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态..."); await self.waiter.wait_listening(conversation_info); action_successful = True
logger.info(f"[私聊][{self.private_name}] 动作 'listening': 进入倾听状态...") elif action == "end_conversation": logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话..."); action_successful = True; self.should_continue = False
await self.waiter.wait_listening(conversation_info) elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了..."); ignore_duration_seconds = 10 * 60; self.ignore_until_timestamp = time.time() + ignore_duration_seconds; logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"); self.state = ConversationState.IGNORED; action_successful = True
action_successful = True # <--- 标记动作成功
elif action == "end_conversation":
logger.info(f"[私聊][{self.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...")
action_successful = True # <--- 标记动作成功
self.should_continue = False
elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}] 动作 'block_and_ignore': 不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(f"[私聊][{self.private_name}] 将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}")
self.state = ConversationState.IGNORED
action_successful = True # <--- 标记动作成功
elif action == "wait": elif action == "wait":
self.state = ConversationState.WAITING self.state = ConversationState.WAITING;
if not self.waiter: raise RuntimeError("Waiter 未初始化") if not self.waiter: raise RuntimeError("Waiter 未初始化"); logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态..."); timeout_occurred = await self.waiter.wait(self.conversation_info); action_successful = True; logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。")
logger.info(f"[私聊][{self.private_name}] 动作 'wait': 进入等待状态...") else: logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}"); final_status = "recall"; final_reason = f"未知的动作类型: {action}"
timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # <--- 标记动作成功
# wait 的 reason 在 finally 中设置
logger.debug(f"[私聊][{self.private_name}] Wait 动作完成,无需在此清理消息。")
else:
logger.warning(f"[私聊][{self.private_name}] 未知的动作类型: {action}")
final_status = "recall"; final_reason = f"未知的动作类型: {action}" # 未知动作直接失败
# --- 重置非回复动作的追问状态 --- # --- 重置非回复动作的追问状态 ---
if action not in ["direct_reply", "send_new_message", "say_goodbye"]: if action not in ["direct_reply", "send_new_message", "say_goodbye"]:
@ -405,68 +320,42 @@ class Conversation:
logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}") logger.error(f"[私聊][{self.private_name}] 处理动作 '{action}' 时出错: {handle_err}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}") logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
final_status = "error"; final_reason = f"处理动作时出错: {handle_err}" final_status = "error"; final_reason = f"处理动作时出错: {handle_err}"
self.state = ConversationState.ERROR self.state = ConversationState.ERROR # <--- 使用修正后的 ERROR 状态
conversation_info.last_successful_reply_action = None conversation_info.last_successful_reply_action = None
finally: finally:
# --- 重置临时计数值 --- # --- 重置临时计数值 ---
conversation_info.other_new_messages_during_planning_count = 0 conversation_info.other_new_messages_during_planning_count = 0
# --- 更新 Action History 状态 (优化) --- # --- 更新 Action History 状态 (优化) ---
# 如果状态仍然是默认的 recall但 action_successful 为 True则更新为 done
if final_status == "recall" and action_successful: if final_status == "recall" and action_successful:
final_status = "done" final_status = "done"
# 设置成功的 reason (可以根据动作类型细化) if action == "wait": timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False; final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)")
if action == "wait": elif action == "listening": final_reason = "进入倾听状态"
# 检查是否是因为超时结束的(需要 waiter 返回值,或者检查 goal_list elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]: final_reason = f"成功执行 {action}"
timeout_occurred = any("分钟," in g.get("goal","") for g in conversation_info.goal_list if isinstance(g, dict)) if conversation_info.goal_list else False elif action in ["direct_reply", "send_new_message", "say_goodbye"]: final_reason = "成功发送"
final_reason = "等待完成" + (" (超时)" if timeout_occurred else " (收到新消息或中断)") else: final_reason = "动作成功完成"
elif action == "listening": elif final_status == "recall" and not action_successful and not final_reason.startswith("回复检查不通过"):
final_reason = "进入倾听状态" if not final_reason or final_reason == "动作未成功执行": final_reason = "动作执行失败或被取消"
elif action in ["rethink_goal", "end_conversation", "block_and_ignore"]:
final_reason = f"成功执行 {action}" # 通用成功原因
else: # 默认为发送成功
final_reason = "成功发送"
# 更新历史记录 # 更新历史记录
if conversation_info.done_action and action_index < len(conversation_info.done_action): if conversation_info.done_action and action_index < len(conversation_info.done_action):
conversation_info.done_action[action_index].update( conversation_info.done_action[action_index].update({"status": final_status, "time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "final_reason": final_reason, "duration_ms": int((time.time() - action_start_time) * 1000)})
{
"status": final_status,
"time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"final_reason": final_reason,
"duration_ms": int((time.time() - action_start_time) * 1000)
}
)
logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}") logger.debug(f"[私聊][{self.private_name}] 动作 '{action}' 最终状态: {final_status}, 原因: {final_reason}")
else: else: logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。")
logger.error(f"[私聊][{self.private_name}] 无法更新动作历史记录,索引 {action_index} 无效或列表为空。")
async def _send_reply(self) -> bool: async def _send_reply(self) -> bool:
"""发送生成的回复""" """发送生成的回复 (保持不变)"""
# (代码同 v6_debug 版本)
if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。"); return False if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}] 没有生成回复内容,无法发送。"); return False
if not self.direct_sender: logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。"); return False if not self.direct_sender: logger.error(f"[私聊][{self.private_name}] DirectMessageSender 未初始化,无法发送。"); return False
if not self.chat_stream: logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。"); return False if not self.chat_stream: logger.error(f"[私聊][{self.private_name}] ChatStream 未初始化,无法发送。"); return False
try: try: reply_content = self.generated_reply; await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content, reply_to_message=None); self.state = ConversationState.ANALYZING; return True
reply_content = self.generated_reply except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}\n{traceback.format_exc()}"); self.state = ConversationState.ERROR; return False # 使用修正后的 ERROR 状态
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content, reply_to_message=None)
self.state = ConversationState.ANALYZING
return True
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 发送消息时失败: {str(e)}\n{traceback.format_exc()}")
self.state = ConversationState.ERROR
return False
# _send_timeout_message 方法可以保持不变
async def _send_timeout_message(self): async def _send_timeout_message(self):
"""发送超时结束消息""" """发送超时结束消息"""
# (代码同 v6_debug 版本)
if not self.direct_sender or not self.chat_stream: logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。"); return if not self.direct_sender or not self.chat_stream: logger.warning(f"[私聊][{self.private_name}] 发送器或聊天流未初始化,无法发送超时消息。"); return
try: try: timeout_content = "我们好像很久没说话了,先这样吧~"; await self.direct_sender.send_message(chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None); logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。"); await self.stop()
timeout_content = "我们好像很久没说话了,先这样吧~"
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=timeout_content, reply_to_message=None)
logger.info(f"[私聊][{self.private_name}] 已发送超时结束消息。")
await self.stop()
except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}") except Exception as e: logger.error(f"[私聊][{self.private_name}] 发送超时消息失败: {str(e)}")

View File

@ -16,7 +16,7 @@ if TYPE_CHECKING:
from ..chat.message import Message from ..chat.message import Message
from .conversation import Conversation from .conversation import Conversation
logger = get_module_logger("idle_conversation") logger = get_module_logger("pfc_idle")
class IdleConversationStarter: class IdleConversationStarter:
"""长时间无对话主动发起对话的组件 """长时间无对话主动发起对话的组件

View File

@ -1,8 +1,10 @@
import time import time
import asyncio # 引入 asyncio
import traceback
from typing import Dict, Optional from typing import Dict, Optional
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from .conversation import Conversation from .conversation import Conversation
import traceback
logger = get_module_logger("pfc_manager") logger = get_module_logger("pfc_manager")
@ -15,141 +17,169 @@ class PFCManager:
# 会话实例管理 # 会话实例管理
_instances: Dict[str, Conversation] = {} _instances: Dict[str, Conversation] = {}
_initializing: Dict[str, bool] = {} _initializing: Dict[str, bool] = {} # 用于防止并发初始化同一个 stream_id
@classmethod @classmethod
def get_instance(cls) -> "PFCManager": def get_instance(cls) -> "PFCManager":
"""获取管理器单例 """获取管理器单例"""
Returns:
PFCManager: 管理器实例
"""
if cls._instance is None: if cls._instance is None:
cls._instance = PFCManager() cls._instance = PFCManager()
return cls._instance return cls._instance
async def get_or_create_conversation(self, stream_id: str, private_name: str) -> Optional[Conversation]: async def get_or_create_conversation(self, stream_id: str, private_name: str) -> Optional[Conversation]:
"""获取或创建对话实例 """获取或创建对话实例,并确保其启动"""
Args: # 检查是否正在初始化 (防止并发问题)
stream_id: 聊天流ID if self._initializing.get(stream_id, False):
private_name: 私聊名称 logger.debug(f"[私聊][{private_name}] 会话实例正在初始化中,请稍候: {stream_id}")
# 可以选择等待一小段时间或直接返回 None
await asyncio.sleep(0.5) # 短暂等待,让初始化有机会完成
# 再次检查实例是否存在
if stream_id in self._instances and self._instances[stream_id]._initialized:
logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}")
return self._instances[stream_id]
else:
logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。")
return None # 避免返回未完成的实例
Returns: # 检查是否已有活动实例
Optional[Conversation]: 对话实例创建失败则返回None
"""
# 检查是否已经有实例
if stream_id in self._initializing and self._initializing[stream_id]:
logger.debug(f"[私聊][{private_name}]会话实例正在初始化中: {stream_id}")
return None
if stream_id in self._instances and self._instances[stream_id].should_continue:
logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}")
return self._instances[stream_id]
if stream_id in self._instances: if stream_id in self._instances:
instance = self._instances[stream_id] instance = self._instances[stream_id]
if ( # 检查忽略状态
hasattr(instance, "ignore_until_timestamp") if (hasattr(instance, "ignore_until_timestamp") and
and instance.ignore_until_timestamp instance.ignore_until_timestamp and
and time.time() < instance.ignore_until_timestamp time.time() < instance.ignore_until_timestamp):
): logger.debug(f"[私聊][{private_name}] 会话实例当前处于忽略状态: {stream_id}")
logger.debug(f"[私聊][{private_name}]会话实例当前处于忽略状态: {stream_id}") return None # 处于忽略状态,不返回实例
# 返回 None 阻止交互。或者可以返回实例但标记它被忽略了喵?
# 还是返回 None 吧喵。
return None
# 检查 should_continue 状态 # 检查是否已初始化且应继续运行
if instance.should_continue: if instance._initialized and instance.should_continue:
logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}") logger.debug(f"[私聊][{private_name}] 使用现有活动会话实例: {stream_id}")
return instance return instance
else: else:
# 清理旧实例资源 # 如果实例存在但未初始化或不应继续,清理旧实例
logger.warning(f"[私聊][{private_name}] 发现无效或已停止的旧实例,清理并重新创建: {stream_id}")
await self._cleanup_conversation(instance) await self._cleanup_conversation(instance)
del self._instances[stream_id] # 从字典中移除,确保下面能创建新的
if stream_id in self._instances: del self._instances[stream_id]
if stream_id in self._initializing: del self._initializing[stream_id]
# 创建新实例
# --- 创建并初始化新实例 ---
conversation_instance: Optional[Conversation] = None
try: try:
# 创建新实例 logger.info(f"[私聊][{private_name}] 创建新的对话实例: {stream_id}")
logger.info(f"[私聊][{private_name}]创建新的对话实例: {stream_id}") self._initializing[stream_id] = True # 标记开始初始化
self._initializing[stream_id] = True
# 创建实例 # 创建实例
conversation_instance = Conversation(stream_id, private_name) conversation_instance = Conversation(stream_id, private_name)
self._instances[stream_id] = conversation_instance self._instances[stream_id] = conversation_instance # 立即存入字典
# 启动实例初始化 # **启动实例初始化**
# _initialize_conversation 会调用 conversation._initialize()
await self._initialize_conversation(conversation_instance) await self._initialize_conversation(conversation_instance)
# --- 关键修复:在初始化成功后调用 start() ---
if conversation_instance._initialized and conversation_instance.should_continue:
logger.info(f"[私聊][{private_name}] 初始化成功,调用 conversation.start() 启动主循环...")
await conversation_instance.start() # 确保调用 start 方法
else:
# 如果 _initialize_conversation 内部初始化失败
logger.error(f"[私聊][{private_name}] 初始化未成功完成,无法启动实例 {stream_id}")
# 清理可能部分创建的实例
await self._cleanup_conversation(conversation_instance)
if stream_id in self._instances: del self._instances[stream_id]
conversation_instance = None # 返回 None 表示失败
except Exception as e: except Exception as e:
logger.error(f"[私聊][{private_name}]创建会话实例失败: {stream_id}, 错误: {e}") logger.error(f"[私聊][{private_name}] 创建或启动会话实例时发生严重错误: {stream_id}, 错误: {e}")
return None logger.error(traceback.format_exc())
# 确保清理
if conversation_instance:
await self._cleanup_conversation(conversation_instance)
if stream_id in self._instances: del self._instances[stream_id]
conversation_instance = None # 返回 None
finally:
# 确保初始化标记被清除
if stream_id in self._initializing:
self._initializing[stream_id] = False
return conversation_instance return conversation_instance
async def _initialize_conversation(self, conversation: Conversation): async def _initialize_conversation(self, conversation: Conversation):
"""初始化会话实例 """(内部方法) 初始化会话实例的核心逻辑"""
Args:
conversation: 要初始化的会话实例
"""
stream_id = conversation.stream_id stream_id = conversation.stream_id
private_name = conversation.private_name private_name = conversation.private_name
try: try:
logger.info(f"[私聊][{private_name}]开始初始化会话实例: {stream_id}") logger.info(f"[私聊][{private_name}] 管理器开始调用 conversation._initialize(): {stream_id}")
# 启动初始化流程 await conversation._initialize() # 调用实例自身的初始化方法
await conversation._initialize() # 注意:初始化成功与否由 conversation._initialized 和 conversation.should_continue 标志决定
if conversation._initialized:
# 标记初始化完成 logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}")
self._initializing[stream_id] = False else:
logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}")
logger.info(f"[私聊][{private_name}]会话实例 {stream_id} 初始化完成")
except Exception as e: except Exception as e:
logger.error(f"[私聊][{private_name}]管理器初始化会话实例失败: {stream_id}, 错误: {e}") # _initialize 内部应该处理自己的异常,但这里也捕获以防万一
logger.error(f"[私聊][{private_name}]{traceback.format_exc()}") logger.error(f"[私聊][{private_name}] 调用 conversation._initialize() 时发生未捕获错误: {stream_id}, 错误: {e}")
# 清理失败的初始化 logger.error(traceback.format_exc())
# 确保实例状态反映失败
conversation._initialized = False
conversation.should_continue = False
async def _cleanup_conversation(self, conversation: Conversation): async def _cleanup_conversation(self, conversation: Conversation):
"""清理会话实例的资源 """清理会话实例的资源"""
if not conversation: return
Args: stream_id = conversation.stream_id
conversation: 要清理的会话实例 private_name = conversation.private_name
""" logger.info(f"[私聊][{private_name}] 开始清理会话实例资源: {stream_id}")
try: try:
# 调用conversation的停止方法确保所有组件都被正确关闭 # 调用 conversation 的 stop 方法来停止其内部组件
if hasattr(conversation, 'stop') and callable(conversation.stop): if hasattr(conversation, 'stop') and callable(conversation.stop):
await conversation.stop() await conversation.stop() # stop 方法应处理内部组件的停止
else:
logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。")
# 尝试手动停止已知组件 (作为后备)
if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter:
conversation.idle_conversation_starter.stop()
if hasattr(conversation, 'observation_info') and conversation.observation_info:
conversation.observation_info.unbind_from_chat_observer()
# ChatObserver 是单例,不在此处停止
# 特别确保空闲对话检测器被关闭 logger.info(f"[私聊][{private_name}] 会话实例 {stream_id} 资源已清理")
if hasattr(conversation, 'idle_conversation_starter'):
conversation.idle_conversation_starter.stop()
logger.info(f"[私聊][{conversation.private_name}]会话实例 {conversation.stream_id} 资源已清理")
except Exception as e: except Exception as e:
logger.error(f"[私聊][{conversation.private_name}]清理会话实例资源失败: {e}") logger.error(f"[私聊][{private_name}] 清理会话实例资源时失败: {stream_id}, 错误: {e}")
logger.error(traceback.format_exc())
async def get_conversation(self, stream_id: str) -> Optional[Conversation]: async def get_conversation(self, stream_id: str) -> Optional[Conversation]:
"""获取已存在的会话实例 """获取已存在的会话实例 (只读)"""
instance = self._instances.get(stream_id)
Args: if instance and instance._initialized and instance.should_continue:
stream_id: 聊天流ID # 检查忽略状态
if (hasattr(instance, "ignore_until_timestamp") and
Returns: instance.ignore_until_timestamp and
Optional[Conversation]: 会话实例不存在则返回None time.time() < instance.ignore_until_timestamp):
""" return None # 忽略期间不返回
return self._instances.get(stream_id) return instance
return None # 不存在或无效则返回 None
async def remove_conversation(self, stream_id: str): async def remove_conversation(self, stream_id: str):
"""移除会话实例 """移除并清理会话实例"""
Args:
stream_id: 聊天流ID
"""
if stream_id in self._instances: if stream_id in self._instances:
instance_to_remove = self._instances[stream_id]
logger.info(f"[管理器] 准备移除并清理会话实例: {stream_id}")
try: try:
# 清理资源 # 先从字典中移除引用,防止新的请求获取到正在清理的实例
await self._cleanup_conversation(self._instances[stream_id])
# 删除实例引用
del self._instances[stream_id] del self._instances[stream_id]
logger.info(f"会话实例 {stream_id} 已从管理器中移除") if stream_id in self._initializing: del self._initializing[stream_id]
# 清理资源
await self._cleanup_conversation(instance_to_remove)
logger.info(f"[管理器] 会话实例 {stream_id} 已成功移除并清理")
except Exception as e: except Exception as e:
logger.error(f"移除会话实例 {stream_id} 失败: {e}") logger.error(f"[管理器] 移除或清理会话实例 {stream_id} 时失败: {e}")
logger.error(traceback.format_exc())
else:
logger.warning(f"[管理器] 尝试移除不存在的会话实例: {stream_id}")

View File

@ -18,6 +18,8 @@ class ConversationState(Enum):
ENDED = "结束" ENDED = "结束"
JUDGING = "判断" JUDGING = "判断"
IGNORED = "屏蔽" IGNORED = "屏蔽"
ERROR = "错误" # <--- 添加 ERROR 状态
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]