From 58ceb3b91104e4714756005b536c68cbb15ca93f Mon Sep 17 00:00:00 2001 From: infinitycat Date: Thu, 1 May 2025 00:55:22 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9Elpmm=E7=9A=84Linu?= =?UTF-8?q?x=E5=BF=AB=E6=8D=B7=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/run_lpmm.sh | 51 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 scripts/run_lpmm.sh diff --git a/scripts/run_lpmm.sh b/scripts/run_lpmm.sh new file mode 100644 index 00000000..23436388 --- /dev/null +++ b/scripts/run_lpmm.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# ============================================== +# Environment Initialization +# ============================================== + +# Step 1: Locate project root directory +SCRIPTS_DIR="scripts" +SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) +PROJECT_ROOT=$(cd "$SCRIPT_DIR/.." && pwd) + +# Step 2: Verify scripts directory exists +if [ ! -d "$PROJECT_ROOT/$SCRIPTS_DIR" ]; then + echo "❌ Error: scripts directory not found in project root" >&2 + echo "Current path: $PROJECT_ROOT" >&2 + exit 1 +fi + +# Step 3: Set up Python environment +export PYTHONPATH="$PROJECT_ROOT:$PYTHONPATH" +cd "$PROJECT_ROOT" || { + echo "❌ Failed to cd to project root: $PROJECT_ROOT" >&2 + exit 1 +} + +# Debug info +echo "============================" +echo "Project Root: $PROJECT_ROOT" +echo "Python Path: $PYTHONPATH" +echo "Working Dir: $(pwd)" +echo "============================" + +# ============================================== +# Python Script Execution +# ============================================== + +run_python_script() { + local script_name=$1 + echo "🔄 Running $script_name" + if ! python3 "scripts/$script_name"; then + echo "❌ $script_name failed" >&2 + exit 1 + fi +} + +# Execute scripts in order +run_python_script "raw_data_preprocessor.py" +run_python_script "info_extraction.py" +run_python_script "import_openie.py" + +echo "✅ All scripts completed successfully" \ No newline at end of file From c23a82af70328d8008e7d8f7f6086ca34094df70 Mon Sep 17 00:00:00 2001 From: infinitycat <103594839+infinitycat233@users.noreply.github.com> Date: Thu, 1 May 2025 01:14:35 +0800 Subject: [PATCH 2/4] Update scripts/run_lpmm.sh Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- scripts/run_lpmm.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_lpmm.sh b/scripts/run_lpmm.sh index 23436388..f3f54610 100644 --- a/scripts/run_lpmm.sh +++ b/scripts/run_lpmm.sh @@ -37,7 +37,7 @@ echo "============================" run_python_script() { local script_name=$1 echo "🔄 Running $script_name" - if ! python3 "scripts/$script_name"; then + if ! python3 "$SCRIPTS_DIR/$script_name"; then echo "❌ $script_name failed" >&2 exit 1 fi From 440f474599c79047eae8d1c8bb9696213005ad9c Mon Sep 17 00:00:00 2001 From: lokong <15350495082@163.com> Date: Mon, 5 May 2025 22:02:20 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E9=97=B2?= =?UTF-8?q?=E7=BD=AE=E8=81=8A=E5=A4=A9=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/conversation.py | 38 ++ src/plugins/PFC/idle_conversation_starter.py | 351 +++++++++++++++++++ src/plugins/PFC/pfc_manager.py | 43 ++- template/bot_config_template.toml | 6 + 4 files changed, 437 insertions(+), 1 deletion(-) create mode 100644 src/plugins/PFC/idle_conversation_starter.py diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 9f744c30..413eb81b 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -10,6 +10,7 @@ from typing import Dict, Any, Optional from ..chat.message import Message from .pfc_types import ConversationState from .pfc import ChatObserver, GoalAnalyzer +from .idle_conversation_starter import IdleConversationStarter from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner @@ -55,6 +56,7 @@ class Conversation: self.knowledge_fetcher = KnowledgeFetcher(self.private_name) self.waiter = Waiter(self.stream_id, self.private_name) self.direct_sender = DirectMessageSender(self.private_name) + self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) # 获取聊天流信息 self.chat_stream = chat_manager.get_stream(self.stream_id) @@ -113,6 +115,9 @@ class Conversation: # 让 ChatObserver 从加载的最后一条消息之后开始同步 self.chat_observer.last_message_time = self.observation_info.last_message_time self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录 + + # 初始化空闲对话检测器的最后消息时间 + await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) else: logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。") @@ -121,6 +126,11 @@ class Conversation: # 出错也要继续,只是没有历史记录而已 # 组件准备完成,启动该论对话 self.should_continue = True + + # 启动空闲对话检测器 + self.idle_conversation_starter.start() + logger.info(f"[私聊][{self.private_name}]空闲对话检测器已启动") + asyncio.create_task(self.start()) async def start(self): @@ -137,6 +147,10 @@ class Conversation: while self.should_continue: # 忽略逻辑 if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: + # 暂停空闲对话检测器,避免在忽略期间触发 + if hasattr(self, 'idle_conversation_starter') and self.idle_conversation_starter._running: + self.idle_conversation_starter.stop() + logger.debug(f"[私聊][{self.private_name}]对话被暂时忽略,暂停空闲对话检测") await asyncio.sleep(30) continue elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp: @@ -144,6 +158,12 @@ class Conversation: self.ignore_until_timestamp = None self.should_continue = False continue + else: + # 确保空闲对话检测器在正常对话时是启动的 + if hasattr(self, 'idle_conversation_starter') and not self.idle_conversation_starter._running: + self.idle_conversation_starter.start() + logger.debug(f"[私聊][{self.private_name}]恢复空闲对话检测") + try: # --- 在规划前记录当前新消息数量 --- initial_new_message_count = 0 @@ -368,6 +388,9 @@ class Conversation: self.conversation_info.last_successful_reply_action = "send_new_message" action_successful = True # 标记动作成功 + # 更新最后消息时间,重置空闲检测计时器 + await self.idle_conversation_starter.update_last_message_time() + elif need_replan: # 打回动作决策 logger.warning( @@ -472,6 +495,9 @@ class Conversation: self.conversation_info.last_successful_reply_action = "direct_reply" action_successful = True # 标记动作成功 + # 更新最后消息时间,重置空闲检测计时器 + await self.idle_conversation_starter.update_last_message_time() + elif need_replan: # 打回动作决策 logger.warning( @@ -696,3 +722,15 @@ class Conversation: ) except Exception as e: logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") + + async def stop(self): + """停止对话处理""" + logger.info(f"[私聊][{self.private_name}]停止对话 {self.stream_id}") + self.should_continue = False + + # 停止空闲对话检测器 + if hasattr(self, 'idle_conversation_starter'): + self.idle_conversation_starter.stop() + + if hasattr(self, 'chat_observer'): + self.chat_observer.stop() diff --git a/src/plugins/PFC/idle_conversation_starter.py b/src/plugins/PFC/idle_conversation_starter.py new file mode 100644 index 00000000..d79319c6 --- /dev/null +++ b/src/plugins/PFC/idle_conversation_starter.py @@ -0,0 +1,351 @@ +from typing import List, Tuple, TYPE_CHECKING, Optional, Union +import asyncio +import time +import random +from src.common.logger import get_module_logger +from ..models.utils_model import LLMRequest +from ...config.config import global_config +from .chat_observer import ChatObserver +from .message_sender import DirectMessageSender +from ..chat.chat_stream import ChatStream +from maim_message import UserInfo +from src.individuality.individuality import Individuality +from src.plugins.utils.chat_message_builder import build_readable_messages + +if TYPE_CHECKING: + from ..chat.message import Message + from .conversation import Conversation + +logger = get_module_logger("pfc") + +class IdleConversationStarter: + """长时间无对话主动发起对话的组件 + + 该组件会在一段时间没有对话后,自动生成一条消息发送给用户,以保持对话的活跃度。 + 时间阈值会在配置的最小和最大值之间随机选择,每次发送消息后都会重置。 + """ + + def __init__(self, stream_id: str, private_name: str): + """初始化空闲对话启动器 + + Args: + stream_id: 聊天流ID + private_name: 私聊用户名称 + """ + self.stream_id: str = stream_id + self.private_name: str = private_name + self.chat_observer = ChatObserver.get_instance(stream_id, private_name) + self.message_sender = DirectMessageSender(private_name) + + # 添加异步锁,保护对共享变量的访问 + self._lock: asyncio.Lock = asyncio.Lock() + + # LLM请求对象,用于生成主动对话内容 + self.llm = LLMRequest( + model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter" + ) + + # 个性化信息 + self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3) + self.name: str = global_config.BOT_NICKNAME + self.nick_name: List[str] = global_config.BOT_ALIAS_NAMES + + # 从配置文件读取配置参数,或使用默认值 + self.enabled: bool = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True) + self.idle_check_interval: int = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10) + self.min_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 60) + self.max_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 120) + + # 计算实际触发阈值(在min和max之间随机) + self.actual_idle_threshold: int = random.randint(self.min_idle_time, self.max_idle_time) + + # 工作状态 + self.last_message_time: float = time.time() + self._running: bool = False + self._task: Optional[asyncio.Task] = None + + def start(self) -> None: + """启动空闲对话检测 + + 如果功能被禁用或已经在运行,则不会启动。 + """ + # 如果功能被禁用,则不启动 + if not self.enabled: + logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用") + return + + if self._running: + logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中") + return + + self._running = True + self._task = asyncio.create_task(self._check_idle_loop()) + logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}秒") + + def stop(self) -> None: + """停止空闲对话检测 + + 取消当前运行的任务并重置状态。 + """ + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + self._task = None + logger.info(f"[私聊][{self.private_name}]停止空闲对话检测") + + async def update_last_message_time(self, message_time: Optional[float] = None) -> None: + """更新最后一条消息的时间 + + Args: + message_time: 消息时间戳,如果为None则使用当前时间 + """ + async with self._lock: + self.last_message_time = message_time or time.time() + # 重新随机化下一次触发的时间阈值 + self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time) + logger.debug(f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}秒") + + def reload_config(self) -> None: + """重新加载配置 + + 从配置文件重新读取所有参数,以便动态调整空闲对话检测的行为。 + """ + try: + # 从配置文件重新读取参数 + self.enabled = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True) + self.idle_check_interval = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10) + self.min_idle_time = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 7200) + self.max_idle_time = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 18000) + + logger.debug(f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 最短间隔={self.min_idle_time}秒, 最长间隔={self.max_idle_time}秒") + + # 重新计算实际阈值 + async def update_threshold(): + async with self._lock: + self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time) + logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}秒") + + # 创建一个任务来异步更新阈值 + asyncio.create_task(update_threshold()) + + except Exception as e: + logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}") + + async def _check_idle_loop(self) -> None: + """检查空闲状态的循环 + + 定期检查是否长时间无对话,如果达到阈值则尝试主动发起对话。 + """ + try: + config_reload_counter = 0 + config_reload_interval = 100 # 每100次检查重新加载一次配置 + + while self._running: + # 定期重新加载配置 + config_reload_counter += 1 + if config_reload_counter >= config_reload_interval: + self.reload_config() + config_reload_counter = 0 + + # 检查是否启用了主动对话功能 + if not self.enabled: + # 如果禁用了功能,就等待一段时间后再次检查配置 + await asyncio.sleep(self.idle_check_interval) + continue + + # 使用锁保护对共享变量的读取 + current_time = time.time() + async with self._lock: + idle_time = current_time - self.last_message_time + threshold = self.actual_idle_threshold + + if idle_time >= threshold: + logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天") + await self._initiate_conversation() + # 更新时间,避免连续触发 + await self.update_last_message_time() + + # 等待下一次检查 + await asyncio.sleep(self.idle_check_interval) + + except asyncio.CancelledError: + logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}") + # 尝试重新启动检测循环 + if self._running: + logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测") + self._task = asyncio.create_task(self._check_idle_loop()) + + async def _initiate_conversation(self) -> None: + """生成并发送主动对话内容 + + 获取聊天历史记录,使用LLM生成合适的开场白,然后发送消息。 + """ + try: + # 获取聊天历史记录,用于生成更合适的开场白 + messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息 + chat_history_text = await build_readable_messages( + messages, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, + ) + + # 构建提示词 + prompt = f"""{self.personality_info}。你的名字是{self.name}。 + 你正在与用户{self.private_name}进行QQ私聊, + 但已经有一段时间没有对话了。 + 你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。 + 请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。 + 这条消息应该能够引起用户的兴趣,重新开始对话。 + 最近的对话历史(可能已经过去了很久): + {chat_history_text} + 请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。 + 消息内容尽量简短,不要超过20个字,不要添加任何表情符号。 + """ + + # 尝试生成回复,添加超时处理 + try: + content, _ = await asyncio.wait_for( + self.llm.generate_response_async(prompt), + timeout=30 # 30秒超时 + ) + except asyncio.TimeoutError: + logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时") + return + except Exception as llm_err: + logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}") + return + + # 清理结果 + content = content.strip() + content = content.strip('"\'') + + if not content: + logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空") + return + + # 统一错误处理,从这里开始所有操作都在同一个try-except块中 + logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送") + + from .pfc_manager import PFCManager + from src.plugins.chat.chat_stream import chat_manager + + # 获取当前实例 + pfc_manager = PFCManager.get_instance() + + # 结束当前对话实例(如果存在) + current_conversation = await pfc_manager.get_conversation(self.stream_id) + if current_conversation: + logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例") + try: + await current_conversation.stop() + await pfc_manager.remove_conversation(self.stream_id) + except Exception as e: + logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例") + + # 创建新的对话实例 + logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息") + new_conversation = None + try: + new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name) + except Exception as e: + logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}") + return + + # 确保新对话实例已初始化完成 + chat_stream = await self._get_chat_stream(new_conversation) + if not chat_stream: + logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息") + return + + # 发送消息 + try: + await self.message_sender.send_message( + chat_stream=chat_stream, + content=content, + reply_to_message=None + ) + + # 更新空闲会话启动器的最后消息时间 + await self.update_last_message_time() + + # 如果新对话实例有一个聊天观察者,请触发更新 + if new_conversation and hasattr(new_conversation, 'chat_observer'): + logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新") + try: + new_conversation.chat_observer.trigger_update() + except Exception as e: + logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}") + + logger.success(f"[私聊][{self.private_name}]成功主动发起对话: {content}") + except Exception as e: + logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}") + + except Exception as e: + # 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃 + logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}") + + async def _get_chat_stream(self, conversation: Optional['Conversation'] = None) -> Optional[ChatStream]: + """获取可用的聊天流 + + 尝试多种方式获取聊天流: + 1. 从传入的对话实例中获取 + 2. 从全局聊天管理器中获取 + 3. 创建一个新的聊天流 + + Args: + conversation: 对话实例,可以为None + + Returns: + Optional[ChatStream]: 如果成功获取则返回聊天流,否则返回None + """ + chat_stream = None + + # 1. 尝试从对话实例获取 + if conversation and hasattr(conversation, 'should_continue'): + # 等待一小段时间,确保初始化完成 + retry_count = 0 + max_retries = 10 + while not conversation.should_continue and retry_count < max_retries: + await asyncio.sleep(0.5) + retry_count += 1 + logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}") + + if not conversation.should_continue: + logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流") + + # 尝试使用对话实例的聊天流 + if hasattr(conversation, 'chat_stream') and conversation.chat_stream: + logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流") + return conversation.chat_stream + + # 2. 尝试从聊天管理器获取 + from src.plugins.chat.chat_stream import chat_manager + try: + logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流") + chat_stream = chat_manager.get_stream(self.stream_id) + if chat_stream: + return chat_stream + except Exception as e: + logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}") + + # 3. 创建新的聊天流 + try: + logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流") + # 创建用户信息对象 + user_info = UserInfo( + user_id=global_config.BOT_QQ, + user_nickname=global_config.BOT_NICKNAME, + platform="qq" + ) + # 创建聊天流 + return ChatStream(self.stream_id, "qq", user_info) + except Exception as e: + logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}") + return None \ No newline at end of file diff --git a/src/plugins/PFC/pfc_manager.py b/src/plugins/PFC/pfc_manager.py index 621686a9..aa8aea49 100644 --- a/src/plugins/PFC/pfc_manager.py +++ b/src/plugins/PFC/pfc_manager.py @@ -33,6 +33,7 @@ class PFCManager: Args: stream_id: 聊天流ID + private_name: 私聊名称 Returns: Optional[Conversation]: 对话实例,创建失败则返回None @@ -61,7 +62,12 @@ class PFCManager: if instance.should_continue: logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}") return instance - # else: 实例存在但不应继续 + else: + # 清理旧实例资源 + await self._cleanup_conversation(instance) + del self._instances[stream_id] + + # 创建新实例 try: # 创建新实例 logger.info(f"[私聊][{private_name}]创建新的对话实例: {stream_id}") @@ -102,6 +108,25 @@ class PFCManager: logger.error(f"[私聊][{private_name}]{traceback.format_exc()}") # 清理失败的初始化 + async def _cleanup_conversation(self, conversation: Conversation): + """清理会话实例的资源 + + Args: + conversation: 要清理的会话实例 + """ + try: + # 调用conversation的停止方法,确保所有组件都被正确关闭 + if hasattr(conversation, 'stop') and callable(conversation.stop): + await conversation.stop() + + # 特别确保空闲对话检测器被关闭 + if hasattr(conversation, 'idle_conversation_starter'): + conversation.idle_conversation_starter.stop() + + logger.info(f"[私聊][{conversation.private_name}]会话实例 {conversation.stream_id} 资源已清理") + except Exception as e: + logger.error(f"[私聊][{conversation.private_name}]清理会话实例资源失败: {e}") + async def get_conversation(self, stream_id: str) -> Optional[Conversation]: """获取已存在的会话实例 @@ -112,3 +137,19 @@ class PFCManager: Optional[Conversation]: 会话实例,不存在则返回None """ return self._instances.get(stream_id) + + async def remove_conversation(self, stream_id: str): + """移除会话实例 + + Args: + stream_id: 聊天流ID + """ + if stream_id in self._instances: + try: + # 清理资源 + await self._cleanup_conversation(self._instances[stream_id]) + # 删除实例引用 + del self._instances[stream_id] + logger.info(f"会话实例 {stream_id} 已从管理器中移除") + except Exception as e: + logger.error(f"移除会话实例 {stream_id} 失败: {e}") diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index c924d35a..89b685a0 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -290,6 +290,12 @@ provider = "SILICONFLOW" pri_in = 2 pri_out = 8 +[idle_conversation] +enable_idle_conversation = true +idle_check_interval = 10 # 检查间隔,10分钟检查一次 +min_idle_time = 7200 # 最短无活动时间,2小时 (7200秒) +max_idle_time = 18000 # 最长无活动时间,5小时 (18000秒) + #以下模型暂时没有使用!! #以下模型暂时没有使用!! From 73f2148fca72ecbd161af3dd81fbb42ddf210371 Mon Sep 17 00:00:00 2001 From: Plutor-05 <145013457+Plutor-05@users.noreply.github.com> Date: Mon, 5 May 2025 23:11:23 +0800 Subject: [PATCH 4/4] Update conversation.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 尝试修改了合并冲突问题 --- src/plugins/PFC/conversation.py | 661 ++++++++++++++------------------ 1 file changed, 293 insertions(+), 368 deletions(-) diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 413eb81b..98c427d4 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -1,29 +1,31 @@ +# -*- coding: utf-8 -*- +# File: conversation.py import time import asyncio import datetime -# from .message_storage import MongoDBMessageStorage from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat - -# from ...config.config import global_config -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Set # <-- 添加 Set 类型提示 from ..chat.message import Message from .pfc_types import ConversationState from .pfc import ChatObserver, GoalAnalyzer -from .idle_conversation_starter import IdleConversationStarter from .message_sender import DirectMessageSender from src.common.logger_manager import get_logger from .action_planner import ActionPlanner from .observation_info import ObservationInfo -from .conversation_info import ConversationInfo # 确保导入 ConversationInfo +from .conversation_info import ConversationInfo from .reply_generator import ReplyGenerator from ..chat.chat_stream import ChatStream +from .idle_conversation_starter import IdleConversationStarter from maim_message import UserInfo from src.plugins.chat.chat_stream import chat_manager from .pfc_KnowledgeFetcher import KnowledgeFetcher from .waiter import Waiter import traceback +from rich.traceback import install + +install(extra_lines=3) logger = get_logger("pfc") @@ -32,23 +34,16 @@ class Conversation: """对话类,负责管理单个对话的状态和行为""" def __init__(self, stream_id: str, private_name: str): - """初始化对话实例 - - Args: - stream_id: 聊天流ID - """ + """初始化对话实例""" self.stream_id = stream_id self.private_name = private_name self.state = ConversationState.INIT self.should_continue = False self.ignore_until_timestamp: Optional[float] = None - - # 回复相关 self.generated_reply = "" async def _initialize(self): - """初始化实例,注册所有组件""" - + """初始化实例,注册所有组件 (保持不变)""" try: self.action_planner = ActionPlanner(self.stream_id, self.private_name) self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name) @@ -56,11 +51,8 @@ class Conversation: self.knowledge_fetcher = KnowledgeFetcher(self.private_name) self.waiter = Waiter(self.stream_id, self.private_name) self.direct_sender = DirectMessageSender(self.private_name) - self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) - - # 获取聊天流信息 self.chat_stream = chat_manager.get_stream(self.stream_id) - + self.idle_conversation_starter = IdleConversationStarter(self.stream_id, self.private_name) self.stop_action_planner = False except Exception as e: logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册运行组件失败: {e}") @@ -68,14 +60,10 @@ class Conversation: raise try: - # 决策所需要的信息,包括自身自信和观察信息两部分 - # 注册观察器和观测信息 self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name) self.chat_observer.start() self.observation_info = ObservationInfo(self.private_name) self.observation_info.bind_to_chat_observer(self.chat_observer) - # print(self.chat_observer.get_cached_messages(limit=) - self.conversation_info = ConversationInfo() except Exception as e: logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}") @@ -83,10 +71,10 @@ class Conversation: raise try: logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...") - initial_messages = get_raw_msg_before_timestamp_with_chat( # + initial_messages = get_raw_msg_before_timestamp_with_chat( chat_id=self.stream_id, timestamp=time.time(), - limit=30, # 加载最近30条作为初始上下文,可以调整 + limit=30, ) chat_talking_prompt = await build_readable_messages( initial_messages, @@ -96,45 +84,35 @@ class Conversation: read_mark=0.0, ) if initial_messages: - # 将加载的消息填充到 ObservationInfo 的 chat_history self.observation_info.chat_history = initial_messages self.observation_info.chat_history_str = chat_talking_prompt + "\n" self.observation_info.chat_history_count = len(initial_messages) - - # 更新 ObservationInfo 中的时间戳等信息 last_msg = initial_messages[-1] self.observation_info.last_message_time = last_msg.get("time") last_user_info = UserInfo.from_dict(last_msg.get("user_info", {})) self.observation_info.last_message_sender = last_user_info.user_id self.observation_info.last_message_content = last_msg.get("processed_plain_text", "") - logger.info( f"[私聊][{self.private_name}]成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}" ) - - # 让 ChatObserver 从加载的最后一条消息之后开始同步 + # --- 注意: 下面两行保持不变,但其健壮性依赖于 ChatObserver 的实现 --- self.chat_observer.last_message_time = self.observation_info.last_message_time - self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录 - - # 初始化空闲对话检测器的最后消息时间 + self.chat_observer.last_message_read = last_msg await self.idle_conversation_starter.update_last_message_time(self.observation_info.last_message_time) else: logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。") except Exception as load_err: logger.error(f"[私聊][{self.private_name}]加载初始聊天记录时出错: {load_err}") - # 出错也要继续,只是没有历史记录而已 - # 组件准备完成,启动该论对话 - self.should_continue = True + self.should_continue = True + asyncio.create_task(self.start()) # 启动空闲对话检测器 self.idle_conversation_starter.start() logger.info(f"[私聊][{self.private_name}]空闲对话检测器已启动") - asyncio.create_task(self.start()) - async def start(self): - """开始对话流程""" + """开始对话流程 (保持不变)""" try: logger.info(f"[私聊][{self.private_name}]对话系统启动中...") asyncio.create_task(self._plan_and_action_loop()) @@ -145,7 +123,7 @@ class Conversation: async def _plan_and_action_loop(self): """思考步,PFC核心循环模块""" while self.should_continue: - # 忽略逻辑 + # 忽略逻辑 (保持不变) if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp: # 暂停空闲对话检测器,避免在忽略期间触发 if hasattr(self, 'idle_conversation_starter') and self.idle_conversation_starter._running: @@ -165,66 +143,63 @@ class Conversation: logger.debug(f"[私聊][{self.private_name}]恢复空闲对话检测") try: - # --- 在规划前记录当前新消息数量 --- - initial_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - initial_new_message_count = self.observation_info.new_messages_count + 1 # 算上麦麦自己发的那一条 + # --- [修改点 1] 在规划前记录未处理消息的 ID 集合 --- + message_ids_before_planning = set() + initial_unprocessed_message_count = 0 + if hasattr(self.observation_info, "unprocessed_messages"): + message_ids_before_planning = {msg.get("message_id") for msg in self.observation_info.unprocessed_messages if msg.get("message_id")} + initial_unprocessed_message_count = len(self.observation_info.unprocessed_messages) + logger.debug(f"[私聊][{self.private_name}]规划开始,当前未处理消息数: {initial_unprocessed_message_count}, IDs: {message_ids_before_planning}") else: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' before planning." - ) + logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' before planning.") - # --- 调用 Action Planner --- - # 传递 self.conversation_info.last_successful_reply_action + # --- 调用 Action Planner (保持不变) --- action, reason = await self.action_planner.plan( self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action ) - # --- 规划后检查是否有 *更多* 新消息到达 --- - current_new_message_count = 0 - if hasattr(self.observation_info, "new_messages_count"): - current_new_message_count = self.observation_info.new_messages_count + # --- [修改点 2] 规划后检查是否有 *过多* 新消息到达 --- + current_unprocessed_messages = [] + current_unprocessed_message_count = 0 + if hasattr(self.observation_info, "unprocessed_messages"): + current_unprocessed_messages = self.observation_info.unprocessed_messages + current_unprocessed_message_count = len(current_unprocessed_messages) else: - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' after planning." - ) + logger.warning(f"[私聊][{self.private_name}]ObservationInfo missing 'unprocessed_messages' after planning.") - if current_new_message_count > initial_new_message_count + 2: + # 计算规划期间实际新增的消息数量 + new_messages_during_planning_count = 0 + for msg in current_unprocessed_messages: + msg_id = msg.get("message_id") + if msg_id and msg_id not in message_ids_before_planning: + new_messages_during_planning_count += 1 + + logger.debug(f"[私聊][{self.private_name}]规划结束,当前未处理消息数: {current_unprocessed_message_count}, 规划期间新增: {new_messages_during_planning_count}") + + # **核心逻辑:判断是否中断** (保持不变) + if new_messages_during_planning_count > 2: logger.info( - f"[私聊][{self.private_name}]规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划" + f"[私聊][{self.private_name}]规划期间新增消息数 ({new_messages_during_planning_count}) 超过阈值(2),取消本次行动 '{action}',重新规划" ) - # 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了 self.conversation_info.last_successful_reply_action = None await asyncio.sleep(0.1) - continue - - # 包含 send_new_message - if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]: - if hasattr(self.observation_info, "clear_unprocessed_messages"): - logger.debug( - f"[私聊][{self.private_name}]准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。" - ) - await self.observation_info.clear_unprocessed_messages() - if hasattr(self.observation_info, "new_messages_count"): - self.observation_info.new_messages_count = 0 - else: - logger.error( - f"[私聊][{self.private_name}]无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!" - ) + continue # 跳过本轮后续处理,直接进入下一轮循环重新规划 + # --- 执行动作 (移除 message_ids_before_planning 参数传递) --- await self._handle_action(action, reason, self.observation_info, self.conversation_info) - # 检查是否需要结束对话 (逻辑不变) + # --- 检查是否需要结束对话 (逻辑保持不变) --- goal_ended = False if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list: for goal_item in self.conversation_info.goal_list: + current_goal = None if isinstance(goal_item, dict): current_goal = goal_item.get("goal") - - if current_goal == "结束对话": + elif isinstance(goal_item, str): + current_goal = goal_item + if isinstance(current_goal, str) and current_goal == "结束对话": goal_ended = True break - if goal_ended: self.should_continue = False logger.info(f"[私聊][{self.private_name}]检测到'结束对话'目标,停止循环。") @@ -239,39 +214,20 @@ class Conversation: logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}") - def _check_new_messages_after_planning(self): - """检查在规划后是否有新消息""" - # 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性 - if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "new_messages_count"): - logger.warning( - f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。" - ) - return False # 或者根据需要抛出错误 - if self.observation_info.new_messages_count > 2: - logger.info( - f"[私聊][{self.private_name}]生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划" - ) - # 如果有新消息,也应该重置上次回复状态 - if hasattr(self, "conversation_info"): # 确保 conversation_info 已初始化 - self.conversation_info.last_successful_reply_action = None - else: - logger.warning( - f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。" - ) - return True - return False + # --- 移除 _check_interrupt_before_sending 方法 --- + # def _check_interrupt_before_sending(self, message_ids_before_planning: set) -> bool: + # ... (旧代码移除) def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象""" + """将消息字典转换为Message对象 (保持不变)""" try: - # 尝试从 msg_dict 直接获取 chat_stream,如果失败则从全局 chat_manager 获取 chat_info = msg_dict.get("chat_info") if chat_info and isinstance(chat_info, dict): chat_stream = ChatStream.from_dict(chat_info) - elif self.chat_stream: # 使用实例变量中的 chat_stream + elif self.chat_stream: chat_stream = self.chat_stream - else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id) + else: chat_stream = chat_manager.get_stream(self.stream_id) if not chat_stream: raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}") @@ -279,23 +235,26 @@ class Conversation: user_info = UserInfo.from_dict(msg_dict.get("user_info", {})) return Message( - message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID - chat_stream=chat_stream, # 使用确定的 chat_stream - time=msg_dict.get("time", time.time()), # 提供默认时间 + message_id=msg_dict.get("message_id", f"gen_{time.time()}"), + chat_stream=chat_stream, + time=msg_dict.get("time", time.time()), user_info=user_info, processed_plain_text=msg_dict.get("processed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", ""), ) except Exception as e: logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}") - # 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题 raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e + # --- [修改点 3] 修改 _handle_action 签名并调整内部逻辑 (移除 message_ids_before_planning 参数) --- async def _handle_action( - self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo + self, + action: str, + reason: str, + observation_info: ObservationInfo, + conversation_info: ConversationInfo ): """处理规划的行动""" - logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}") # 记录action历史 (逻辑不变) @@ -306,18 +265,17 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), "final_reason": None, } - # 确保 done_action 列表存在 if not hasattr(conversation_info, "done_action"): conversation_info.done_action = [] conversation_info.done_action.append(current_action_record) action_index = len(conversation_info.done_action) - 1 - action_successful = False # 用于标记动作是否成功完成 + action_successful = False + reply_sent = False # --- 根据不同的 action 执行 --- - - # send_new_message 失败后执行 wait - if action == "send_new_message": + if action == "direct_reply" or action == "send_new_message": + # 合并 reply 和 follow-up 的生成/检查逻辑 (保持不变) max_reply_attempts = 3 reply_attempt_count = 0 is_suitable = False @@ -327,242 +285,192 @@ class Conversation: while reply_attempt_count < max_reply_attempts and not is_suitable: reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) + log_prefix = f"[私聊][{self.private_name}]尝试生成 '{action}' 回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." + logger.info(log_prefix) self.state = ConversationState.GENERATING - # 1. 生成回复 (调用 generate 时传入 action_type) self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="send_new_message" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}" + observation_info, conversation_info, action_type=action ) + logger.info(f"{log_prefix} 生成内容: {self.generated_reply}") - # 2. 检查回复 (逻辑不变) self.state = ConversationState.CHECKING try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" + current_goal_str = "" + if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list: + goal_item = conversation_info.goal_list[0] + if isinstance(goal_item, dict): + current_goal_str = goal_item.get('goal', '') + elif isinstance(goal_item, str): + current_goal_str = goal_item + if is_suitable: + # 检查是否有新消息 + if self._check_new_messages_after_planning(): + logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动") + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"} + ) + return # 直接返回,重新规划 + + # 发送合适的回复 + self.generated_reply = final_reply_to_send + # --- 在这里调用 _send_reply --- + await self._send_reply() # <--- 调用恢复后的函数 + + # 更新状态: 标记上次成功是 send_new_message + self.conversation_info.last_successful_reply_action = "send_new_message" + action_successful = True # 标记动作成功 + + # 更新最后消息时间,重置空闲检测计时器 + await self.idle_conversation_starter.update_last_message_time() + + elif need_replan: + # 打回动作决策 + logger.warning( + f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", + "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} + ) + chat_history_for_check = getattr(observation_info, 'chat_history', []) + chat_history_str_for_check = getattr(observation_info, 'chat_history_str', '') + + is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( reply=self.generated_reply, goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, + chat_history=chat_history_for_check, + chat_history_str=chat_history_str_for_check, retry_count=reply_attempt_count - 1, ) logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" + f"{log_prefix} 检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" ) + + if not is_suitable or need_replan: + conversation_info.last_reply_rejection_reason = check_reason + conversation_info.last_rejected_reply_content = self.generated_reply + else: + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + if is_suitable: final_reply_to_send = self.generated_reply break elif need_replan: logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}" + f"{log_prefix} 检查建议重新规划,停止尝试。原因: {check_reason}" ) break except Exception as check_err: logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}" + f"{log_prefix} 调用 ReplyChecker 时出错: {check_err}" ) check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" + conversation_info.last_reply_rejection_reason = f"检查过程出错: {check_err}" + conversation_info.last_rejected_reply_content = self.generated_reply break - # 循环结束,处理最终结果 + # --- 处理生成和检查的结果 --- if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"} - ) - return # 直接返回,重新规划 + # --- [修改点 4] 记录发送前时间戳 --- + timestamp_before_sending = time.time() + logger.debug(f"[私聊][{self.private_name}]准备发送回复,记录发送前时间戳: {timestamp_before_sending}") - # 发送合适的回复 + # 确认发送 self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 - - # 更新状态: 标记上次成功是 send_new_message - self.conversation_info.last_successful_reply_action = "send_new_message" - action_successful = True # 标记动作成功 - - # 更新最后消息时间,重置空闲检测计时器 + send_success = await self._send_reply() # 调用发送函数 await self.idle_conversation_starter.update_last_message_time() - elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"} - ) + if send_success: + action_successful = True + reply_sent = True + logger.info(f"[私聊][{self.private_name}]成功发送 '{action}' 回复.") + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None - else: - # 追问失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 追问失败,下次用初始 prompt - self.conversation_info.last_successful_reply_action = None + # --- [修改点 5] 基于时间戳处理消息和决定下一轮 prompt 类型 --- + current_unprocessed_messages = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear: Set[str] = set() # 使用 Set 类型 + new_messages_during_sending_count = 0 - # 执行 Wait 操作 - logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) + for msg in current_unprocessed_messages: + msg_time = msg.get('time') + msg_id = msg.get('message_id') + if msg_id and msg_time: # 确保时间和 ID 存在 + if msg_time < timestamp_before_sending: + message_ids_to_clear.add(msg_id) + else: + # 时间戳大于等于发送前时间戳,视为新消息 + new_messages_during_sending_count += 1 - elif action == "direct_reply": - max_reply_attempts = 3 - reply_attempt_count = 0 - is_suitable = False - need_replan = False - check_reason = "未进行尝试" - final_reply_to_send = "" + logger.debug(f"[私聊][{self.private_name}]回复发送后,检测到 {len(message_ids_to_clear)} 条发送前消息待清理,{new_messages_during_sending_count} 条发送期间/之后的新消息。") - while reply_attempt_count < max_reply_attempts and not is_suitable: - reply_attempt_count += 1 - logger.info( - f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..." - ) - self.state = ConversationState.GENERATING + # 清理发送前到达的消息 + if message_ids_to_clear: + await observation_info.clear_processed_messages(message_ids_to_clear) + else: + logger.debug(f"[私聊][{self.private_name}]没有需要清理的发送前消息。") - # 1. 生成回复 - self.generated_reply = await self.reply_generator.generate( - observation_info, conversation_info, action_type="direct_reply" - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}" - ) - # 2. 检查回复 - self.state = ConversationState.CHECKING - try: - current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else "" - is_suitable, check_reason, need_replan = await self.reply_generator.check_reply( - reply=self.generated_reply, - goal=current_goal_str, - chat_history=observation_info.chat_history, - chat_history_str=observation_info.chat_history_str, - retry_count=reply_attempt_count - 1, - ) - logger.info( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}" - ) - if is_suitable: - final_reply_to_send = self.generated_reply - break - elif need_replan: - logger.warning( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}" - ) - break - except Exception as check_err: - logger.error( - f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}" - ) - check_reason = f"第 {reply_attempt_count} 次检查过程出错: {check_err}" - break - # 循环结束,处理最终结果 - if is_suitable: - # 检查是否有新消息 - if self._check_new_messages_after_planning(): - logger.info(f"[私聊][{self.private_name}]生成首次回复期间收到新消息,取消发送,重新规划行动") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"} - ) - return # 直接返回,重新规划 - # 发送合适的回复 - self.generated_reply = final_reply_to_send - # --- 在这里调用 _send_reply --- - await self._send_reply() # <--- 调用恢复后的函数 + # 根据发送期间是否有新消息,决定下次规划用哪个 prompt + if new_messages_during_sending_count > 0: + logger.info(f"[私聊][{self.private_name}]检测到 {new_messages_during_sending_count} 条在发送期间/之后到达的新消息,下一轮将使用首次回复逻辑处理。") + self.conversation_info.last_successful_reply_action = None # 强制下一轮用 PROMPT_INITIAL_REPLY + else: + logger.info(f"[私聊][{self.private_name}]发送期间/之后无新消息,下一轮将根据 '{action}' 使用追问逻辑。") + self.conversation_info.last_successful_reply_action = action # 保持状态,下一轮可能用 PROMPT_FOLLOW_UP - # 更新状态: 标记上次成功是 direct_reply - self.conversation_info.last_successful_reply_action = "direct_reply" - action_successful = True # 标记动作成功 - - # 更新最后消息时间,重置空闲检测计时器 - await self.idle_conversation_starter.update_last_message_time() + else: # 发送失败 + logger.error(f"[私聊][{self.private_name}]发送 '{action}' 回复失败。") + action_successful = False + self.conversation_info.last_successful_reply_action = None + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送回复时失败"} + ) elif need_replan: - # 打回动作决策 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"} - ) + # 检查后打回动作决策 (保持不变) + logger.warning( + f"[私聊][{self.private_name}]'{action}' 回复检查后决定打回动作决策 (尝试 {reply_attempt_count} 次)。打回原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后打回: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None - else: - # 首次回复失败 - logger.warning( - f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}" - ) - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"} - ) - # 重置状态: 首次回复失败,下次还是用初始 prompt - self.conversation_info.last_successful_reply_action = None + else: # 多次尝试后仍然不合适 (保持不变) + logger.warning( + f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的 '{action}' 回复。最终原因: {check_reason}" + ) + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": f"'{action}' 尝试{reply_attempt_count}次后失败: {check_reason}"} + ) + self.conversation_info.last_successful_reply_action = None - # 执行 Wait 操作 (保持原有逻辑) - logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...") - self.state = ConversationState.WAITING - await self.waiter.wait(self.conversation_info) - wait_action_record = { - "action": "wait", - "plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待", - "status": "done", - "time": datetime.datetime.now().strftime("%H:%M:%S"), - "final_reason": None, - } - conversation_info.done_action.append(wait_action_record) - - elif action == "fetch_knowledge": - self.state = ConversationState.FETCHING - knowledge_query = reason - try: - # 检查 knowledge_fetcher 是否存在 - if not hasattr(self, "knowledge_fetcher"): - logger.error(f"[私聊][{self.private_name}]KnowledgeFetcher 未初始化,无法获取知识。") - raise AttributeError("KnowledgeFetcher not initialized") - - knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history) - logger.info(f"[私聊][{self.private_name}]获取到知识: {knowledge[:100]}..., 来源: {source}") - if knowledge: - # 确保 knowledge_list 存在 - if not hasattr(conversation_info, "knowledge_list"): - conversation_info.knowledge_list = [] - conversation_info.knowledge_list.append( - {"query": knowledge_query, "knowledge": knowledge, "source": source} - ) - action_successful = True - except Exception as fetch_err: - logger.error(f"[私聊][{self.private_name}]获取知识时出错: {str(fetch_err)}") - conversation_info.done_action[action_index].update( - {"status": "recall", "final_reason": f"获取知识失败: {str(fetch_err)}"} - ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + if action == "send_new_message": + logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...") + self.state = ConversationState.WAITING + await self.waiter.wait(self.conversation_info) + wait_action_record = { + "action": "wait", + "plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待", + "status": "done", + "time": datetime.datetime.now().strftime("%H:%M:%S"), + "final_reason": None, + } + conversation_info.done_action.append(wait_action_record) + action_successful = True + self.conversation_info.last_successful_reply_action = None + # --- 处理其他动作 (保持不变,确保状态重置) --- elif action == "rethink_goal": self.state = ConversationState.RETHINKING try: - # 检查 goal_analyzer 是否存在 if not hasattr(self, "goal_analyzer"): - logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。") raise AttributeError("GoalAnalyzer not initialized") await self.goal_analyzer.analyze_goal(conversation_info, observation_info) action_successful = True @@ -571,67 +479,96 @@ class Conversation: conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + elif action == "listening": self.state = ConversationState.LISTENING logger.info(f"[私聊][{self.private_name}]倾听对方发言...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): - logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。") raise AttributeError("Waiter not initialized") await self.waiter.wait_listening(conversation_info) - action_successful = True # Listening 完成就算成功 + action_successful = True except Exception as listen_err: logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"倾听失败: {listen_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None elif action == "say_goodbye": - self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING + self.state = ConversationState.GENERATING logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...") try: - # 1. 生成告别语 (使用 'say_goodbye' action_type) self.generated_reply = await self.reply_generator.generate( observation_info, conversation_info, action_type="say_goodbye" ) logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}") - # 2. 直接发送告别语 (不经过检查) - if self.generated_reply: # 确保生成了内容 - await self._send_reply() # 调用发送方法 - # 发送成功后,标记动作成功 - action_successful = True - logger.info(f"[私聊][{self.private_name}]告别语已发送。") + if self.generated_reply: + # --- [修改点 6] 告别语发送前记录时间戳 --- + timestamp_before_sending_goodbye = time.time() + send_success = await self._send_reply() + if send_success: + action_successful = True + reply_sent = True + logger.info(f"[私聊][{self.private_name}]告别语已发送。") + + # --- [修改点 7] 告别语发送后也处理未读消息 --- + # (虽然通常之后就结束了,但以防万一) + current_unprocessed_messages_goodbye = getattr(observation_info, 'unprocessed_messages', []) + message_ids_to_clear_goodbye: Set[str] = set() + for msg in current_unprocessed_messages_goodbye: + msg_time = msg.get('time') + msg_id = msg.get('message_id') + if msg_id and msg_time and msg_time < timestamp_before_sending_goodbye: + message_ids_to_clear_goodbye.add(msg_id) + if message_ids_to_clear_goodbye: + await observation_info.clear_processed_messages(message_ids_to_clear_goodbye) + + self.should_continue = False # 正常结束 + logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + else: + logger.warning(f"[私聊][{self.private_name}]发送告别语失败。") + action_successful = False + self.should_continue = True # 发送失败不能结束 + conversation_info.done_action[action_index].update( + {"status": "recall", "final_reason": "发送告别语失败"} + ) + self.conversation_info.last_successful_reply_action = None + else: logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。") - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": "未能生成告别语内容"} ) - - # 3. 无论是否发送成功,都准备结束对话 - self.should_continue = False - logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。") + self.conversation_info.last_successful_reply_action = None except Exception as goodbye_err: logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") - # 即使出错,也结束对话 - self.should_continue = False - action_successful = False # 标记动作失败 + action_successful = False + self.should_continue = True conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"} ) + self.conversation_info.last_successful_reply_action = None elif action == "end_conversation": - # 这个分支现在只会在 action_planner 最终决定不告别时被调用 self.should_continue = False logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...") - action_successful = True # 标记这个指令本身是成功的 + action_successful = True + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + elif action == "block_and_ignore": logger.info(f"[私聊][{self.private_name}]不想再理你了...") @@ -641,27 +578,31 @@ class Conversation: f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}" ) self.state = ConversationState.IGNORED - action_successful = True # 标记动作成功 + action_successful = True + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None + else: # 对应 'wait' 动作 self.state = ConversationState.WAITING logger.info(f"[私聊][{self.private_name}]等待更多信息...") try: - # 检查 waiter 是否存在 if not hasattr(self, "waiter"): - logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。") raise AttributeError("Waiter not initialized") _timeout_occurred = await self.waiter.wait(self.conversation_info) - action_successful = True # Wait 完成就算成功 + action_successful = True except Exception as wait_err: logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}") conversation_info.done_action[action_index].update( {"status": "recall", "final_reason": f"等待失败: {wait_err}"} ) - self.conversation_info.last_successful_reply_action = None # 重置状态 + self.conversation_info.last_successful_reply_action = None + conversation_info.last_reply_rejection_reason = None + conversation_info.last_rejected_reply_content = None - # --- 更新 Action History 状态 --- - # 只有当动作本身成功时,才更新状态为 done + + # --- 更新 Action History 状态 (保持不变) --- if action_successful: conversation_info.done_action[action_index].update( { @@ -669,68 +610,52 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), } ) - # 重置状态: 对于非回复类动作的成功,清除上次回复状态 - if action not in ["direct_reply", "send_new_message"]: - self.conversation_info.last_successful_reply_action = None - logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action") - # 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'done'") + else: + logger.debug(f"[私聊][{self.private_name}]动作 '{action}' 标记为 'recall' 或失败") - async def _send_reply(self): - """发送回复""" + + async def _send_reply(self) -> bool: + """发送回复,并返回是否发送成功 (保持不变)""" if not self.generated_reply: logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。") - return + return False try: - _current_time = time.time() reply_content = self.generated_reply - - # 发送消息 (确保 direct_sender 和 chat_stream 有效) if not hasattr(self, "direct_sender") or not self.direct_sender: logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。") - return + return False if not self.chat_stream: logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。") - return + return False await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content) - - # 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息 - # 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持) - # 暂时注释掉,观察是否影响 ObservationInfo 的更新 - # self.chat_observer.trigger_update() - # if not await self.chat_observer.wait_for_update(): - # logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时") - - self.state = ConversationState.ANALYZING # 更新状态 + self.state = ConversationState.ANALYZING + return True except Exception as e: - logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}") + logger.error(f"[私聊][{self.private_name}]发送消息时失败: {str(e)}") logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") self.state = ConversationState.ANALYZING + return False + async def _send_timeout_message(self): - """发送超时结束消息""" + """发送超时结束消息 (保持不变)""" try: messages = self.chat_observer.get_cached_messages(limit=1) if not messages: return - latest_message = self._convert_to_message(messages[0]) await self.direct_sender.send_message( chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message ) + # 停止空闲对话检测器 + if hasattr(self, 'idle_conversation_starter'): + self.idle_conversation_starter.stop() + + if hasattr(self, 'chat_observer'): + self.chat_observer.stop() except Exception as e: logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}") - - async def stop(self): - """停止对话处理""" - logger.info(f"[私聊][{self.private_name}]停止对话 {self.stream_id}") - self.should_continue = False - - # 停止空闲对话检测器 - if hasattr(self, 'idle_conversation_starter'): - self.idle_conversation_starter.stop() - - if hasattr(self, 'chat_observer'): - self.chat_observer.stop()