mirror of https://github.com/Mai-with-u/MaiBot.git
parent
e74ee67376
commit
a6b7ba2d76
|
|
@ -276,11 +276,13 @@ class ActionPlanner:
|
|||
|
||||
time_info = ""
|
||||
try:
|
||||
if not observation_info or not observation_info.bot_id: return ""
|
||||
if not observation_info or not observation_info.bot_id:
|
||||
return ""
|
||||
bot_id_str = str(observation_info.bot_id)
|
||||
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
|
||||
for msg in reversed(observation_info.chat_history):
|
||||
if not isinstance(msg, dict): continue
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
sender_info = msg.get("user_info", {})
|
||||
sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None
|
||||
msg_time = msg.get("time")
|
||||
|
|
@ -289,8 +291,10 @@ class ActionPlanner:
|
|||
if time_diff < 60.0:
|
||||
time_info = f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n"
|
||||
break
|
||||
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}")
|
||||
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}")
|
||||
except AttributeError as e:
|
||||
logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时属性错误: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[私聊][{self.private_name}] 获取 Bot 上次发言时间时出错: {e}")
|
||||
return time_info
|
||||
|
||||
def _get_timeout_context(self, conversation_info: ConversationInfo) -> str:
|
||||
|
|
@ -301,14 +305,18 @@ class ActionPlanner:
|
|||
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
|
||||
last_goal_item = conversation_info.goal_list[-1]
|
||||
last_goal_text = ""
|
||||
if isinstance(last_goal_item, dict): last_goal_text = last_goal_item.get("goal", "")
|
||||
elif isinstance(last_goal_item, str): last_goal_text = last_goal_item
|
||||
if isinstance(last_goal_item, dict):
|
||||
last_goal_text = last_goal_item.get("goal", "")
|
||||
elif isinstance(last_goal_item, str):
|
||||
last_goal_text = last_goal_item
|
||||
if isinstance(last_goal_text, str) and "分钟," in last_goal_text and "思考接下来要做什么" in last_goal_text:
|
||||
wait_time_str = last_goal_text.split("分钟,")[0].replace("你等待了","").strip()
|
||||
timeout_context = f"重要提示:对方已经长时间(约 {wait_time_str} 分钟)没有回复你的消息了,请基于此情况规划下一步。\n"
|
||||
logger.debug(f"[私聊][{self.private_name}] 检测到超时目标: {last_goal_text}")
|
||||
except AttributeError as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}")
|
||||
except Exception as e: logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}")
|
||||
except AttributeError as e:
|
||||
logger.warning(f"[私聊][{self.private_name}] 检查超时目标时属性错误: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[私聊][{self.private_name}] 检查超时目标时出错: {e}")
|
||||
return timeout_context
|
||||
|
||||
def _build_goals_string(self, conversation_info: ConversationInfo) -> str:
|
||||
|
|
@ -319,9 +327,11 @@ class ActionPlanner:
|
|||
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
|
||||
recent_goals = conversation_info.goal_list[-3:]
|
||||
for goal_item in recent_goals:
|
||||
goal = "目标内容缺失"; reasoning = "没有明确原因"
|
||||
goal = "目标内容缺失"
|
||||
reasoning = "没有明确原因"
|
||||
if isinstance(goal_item, dict):
|
||||
goal = goal_item.get("goal", goal); reasoning = goal_item.get("reasoning", reasoning)
|
||||
goal = goal_item.get("goal", goal)
|
||||
reasoning = goal_item.get("reasoning", reasoning)
|
||||
elif isinstance(goal_item, str): goal = goal_item
|
||||
goal = str(goal) if goal is not None else "目标内容缺失"
|
||||
reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
|
||||
|
|
@ -339,8 +349,8 @@ class ActionPlanner:
|
|||
try:
|
||||
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: chat_history_text = observation_info.chat_history_str
|
||||
elif hasattr(observation_info, "chat_history") and observation_info.chat_history:
|
||||
history_slice = observation_info.chat_history[-20:]
|
||||
chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
|
||||
history_slice = observation_info.chat_history[-20:]
|
||||
chat_history_text = await build_readable_messages(history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
|
||||
else: chat_history_text = "还没有聊天记录。\n"
|
||||
unread_count = getattr(observation_info, 'new_messages_count', 0)
|
||||
unread_messages = getattr(observation_info, 'unprocessed_messages', [])
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -1,351 +1,193 @@
|
|||
from typing import List, Tuple, TYPE_CHECKING, Optional, Union
|
||||
import asyncio
|
||||
import time
|
||||
import random
|
||||
import traceback
|
||||
from typing import List, TYPE_CHECKING, Optional, Dict, Any
|
||||
|
||||
from src.common.logger import get_module_logger
|
||||
from ..models.utils_model import LLMRequest
|
||||
from ...config.config import global_config
|
||||
from .chat_observer import ChatObserver
|
||||
from .message_sender import DirectMessageSender
|
||||
from ..chat.chat_stream import ChatStream
|
||||
from maim_message import UserInfo
|
||||
from src.individuality.individuality import Individuality
|
||||
from src.plugins.utils.chat_message_builder import build_readable_messages
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..chat.message import Message
|
||||
# 仍然需要类型提示
|
||||
from .conversation import Conversation
|
||||
from .pfc_manager import PFCManager # 仅用于类型提示
|
||||
|
||||
logger = get_module_logger("pfc_idle")
|
||||
logger = get_module_logger("idle_conversation")
|
||||
|
||||
class IdleConversationStarter:
|
||||
"""长时间无对话主动发起对话的组件
|
||||
|
||||
该组件会在一段时间没有对话后,自动生成一条消息发送给用户,以保持对话的活跃度。
|
||||
时间阈值会在配置的最小和最大值之间随机选择,每次发送消息后都会重置。
|
||||
"""
|
||||
|
||||
"""长时间无对话主动发起对话的组件"""
|
||||
|
||||
def __init__(self, stream_id: str, private_name: str):
|
||||
"""初始化空闲对话启动器
|
||||
|
||||
Args:
|
||||
stream_id: 聊天流ID
|
||||
private_name: 私聊用户名称
|
||||
"""
|
||||
"""初始化空闲对话启动器"""
|
||||
self.stream_id: str = stream_id
|
||||
self.private_name: str = private_name
|
||||
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
|
||||
self.message_sender = DirectMessageSender(private_name)
|
||||
|
||||
# 添加异步锁,保护对共享变量的访问
|
||||
self._lock: asyncio.Lock = asyncio.Lock()
|
||||
|
||||
# LLM请求对象,用于生成主动对话内容
|
||||
self.llm = LLMRequest(
|
||||
model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter"
|
||||
)
|
||||
|
||||
# 个性化信息
|
||||
self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3)
|
||||
self.name: str = global_config.BOT_NICKNAME
|
||||
self.nick_name: List[str] = global_config.BOT_ALIAS_NAMES
|
||||
|
||||
# 从配置文件读取配置参数,或使用默认值
|
||||
self.enabled: bool = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True)
|
||||
self.idle_check_interval: int = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10)
|
||||
self.min_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 60)
|
||||
self.max_idle_time: int = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 120)
|
||||
|
||||
# 计算实际触发阈值(在min和max之间随机)
|
||||
self.actual_idle_threshold: int = random.randint(self.min_idle_time, self.max_idle_time)
|
||||
|
||||
# 工作状态
|
||||
idle_config = getattr(global_config, 'idle_conversation', {})
|
||||
self.enabled: bool = idle_config.get('enable_idle_conversation', True)
|
||||
self.idle_check_interval: int = idle_config.get('idle_check_interval', 10)
|
||||
self.min_idle_time: int = idle_config.get('min_idle_time', 60)
|
||||
self.max_idle_time: int = idle_config.get('max_idle_time', 120)
|
||||
self.last_message_time: float = time.time()
|
||||
self.actual_idle_threshold: int = self._get_new_threshold()
|
||||
self._running: bool = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
|
||||
|
||||
def _get_new_threshold(self) -> int:
|
||||
"""计算一个新的随机空闲阈值"""
|
||||
try:
|
||||
min_t = max(10, self.min_idle_time); max_t = max(min_t, self.max_idle_time)
|
||||
return random.randint(min_t, max_t)
|
||||
except ValueError:
|
||||
logger.warning(f"[私聊][{self.private_name}] idle_time 配置无效 ({self.min_idle_time}, {self.max_idle_time}),使用默认值 60-120。")
|
||||
return random.randint(60, 120)
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动空闲对话检测
|
||||
|
||||
如果功能被禁用或已经在运行,则不会启动。
|
||||
"""
|
||||
# 如果功能被禁用,则不启动
|
||||
if not self.enabled:
|
||||
logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用")
|
||||
return
|
||||
|
||||
if self._running:
|
||||
logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中")
|
||||
return
|
||||
|
||||
"""启动空闲对话检测"""
|
||||
if not self.enabled: logger.info(f"[私聊][{self.private_name}] 主动发起对话功能已禁用"); return
|
||||
if self._running: logger.debug(f"[私聊][{self.private_name}] 主动发起对话功能已在运行中"); return
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._check_idle_loop())
|
||||
logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}秒")
|
||||
|
||||
logger.info(f"[私聊][{self.private_name}] 启动空闲对话检测,当前阈值: {self.actual_idle_threshold}秒")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止空闲对话检测
|
||||
|
||||
取消当前运行的任务并重置状态。
|
||||
"""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
"""停止空闲对话检测"""
|
||||
if not self._running: return
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
self._task = None
|
||||
logger.info(f"[私聊][{self.private_name}]停止空闲对话检测")
|
||||
|
||||
if self._task: self._task.cancel(); self._task = None
|
||||
logger.info(f"[私聊][{self.private_name}] 停止空闲对话检测")
|
||||
|
||||
async def update_last_message_time(self, message_time: Optional[float] = None) -> None:
|
||||
"""更新最后一条消息的时间
|
||||
|
||||
Args:
|
||||
message_time: 消息时间戳,如果为None则使用当前时间
|
||||
"""
|
||||
"""更新最后一条消息的时间,并重置阈值"""
|
||||
async with self._lock:
|
||||
self.last_message_time = message_time or time.time()
|
||||
# 重新随机化下一次触发的时间阈值
|
||||
self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time)
|
||||
logger.debug(f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}秒")
|
||||
|
||||
new_time = message_time or time.time()
|
||||
if new_time > self.last_message_time:
|
||||
self.last_message_time = new_time
|
||||
self.actual_idle_threshold = self._get_new_threshold()
|
||||
logger.debug(f"[私聊][{self.private_name}] 更新最后消息时间: {self.last_message_time:.2f},新阈值: {self.actual_idle_threshold}秒")
|
||||
|
||||
def reload_config(self) -> None:
|
||||
"""重新加载配置
|
||||
|
||||
从配置文件重新读取所有参数,以便动态调整空闲对话检测的行为。
|
||||
"""
|
||||
"""重新加载配置"""
|
||||
try:
|
||||
# 从配置文件重新读取参数
|
||||
self.enabled = getattr(global_config, 'idle_conversation', {}).get('enable_idle_conversation', True)
|
||||
self.idle_check_interval = getattr(global_config, 'idle_conversation', {}).get('idle_check_interval', 10)
|
||||
self.min_idle_time = getattr(global_config, 'idle_conversation', {}).get('min_idle_time', 7200)
|
||||
self.max_idle_time = getattr(global_config, 'idle_conversation', {}).get('max_idle_time', 18000)
|
||||
|
||||
logger.debug(f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 最短间隔={self.min_idle_time}秒, 最长间隔={self.max_idle_time}秒")
|
||||
|
||||
# 重新计算实际阈值
|
||||
async def update_threshold():
|
||||
idle_config = getattr(global_config, 'idle_conversation', {})
|
||||
self.enabled = idle_config.get('enable_idle_conversation', True)
|
||||
self.idle_check_interval = idle_config.get('idle_check_interval', 10)
|
||||
self.min_idle_time = idle_config.get('min_idle_time', 60)
|
||||
self.max_idle_time = idle_config.get('max_idle_time', 120)
|
||||
logger.debug(f"[私聊][{self.private_name}] 重新加载主动对话配置: 启用={self.enabled}, 检查间隔={self.idle_check_interval}秒, 阈值范围=[{self.min_idle_time}, {self.max_idle_time}]秒")
|
||||
async def update_threshold_async():
|
||||
async with self._lock:
|
||||
self.actual_idle_threshold = random.randint(self.min_idle_time, self.max_idle_time)
|
||||
logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}秒")
|
||||
|
||||
# 创建一个任务来异步更新阈值
|
||||
asyncio.create_task(update_threshold())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}")
|
||||
|
||||
self.actual_idle_threshold = self._get_new_threshold()
|
||||
logger.debug(f"[私聊][{self.private_name}] 配置重载后更新空闲检测阈值为: {self.actual_idle_threshold}秒")
|
||||
asyncio.create_task(update_threshold_async())
|
||||
if self.enabled and not self._running: self.start()
|
||||
elif not self.enabled and self._running: self.stop()
|
||||
except Exception as e: logger.error(f"[私聊][{self.private_name}] 重新加载配置时出错: {str(e)}")
|
||||
|
||||
async def _check_idle_loop(self) -> None:
|
||||
"""检查空闲状态的循环
|
||||
|
||||
定期检查是否长时间无对话,如果达到阈值则尝试主动发起对话。
|
||||
"""
|
||||
"""检查空闲状态的循环"""
|
||||
try:
|
||||
config_reload_counter = 0
|
||||
config_reload_interval = 100 # 每100次检查重新加载一次配置
|
||||
|
||||
config_reload_counter = 0; config_reload_interval = 60
|
||||
while self._running:
|
||||
# 定期重新加载配置
|
||||
config_reload_counter += 1
|
||||
if config_reload_counter >= config_reload_interval:
|
||||
self.reload_config()
|
||||
config_reload_counter = 0
|
||||
|
||||
# 检查是否启用了主动对话功能
|
||||
if not self.enabled:
|
||||
# 如果禁用了功能,就等待一段时间后再次检查配置
|
||||
await asyncio.sleep(self.idle_check_interval)
|
||||
continue
|
||||
|
||||
# 使用锁保护对共享变量的读取
|
||||
current_time = time.time()
|
||||
async with self._lock:
|
||||
idle_time = current_time - self.last_message_time
|
||||
threshold = self.actual_idle_threshold
|
||||
|
||||
if idle_time >= threshold:
|
||||
logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天")
|
||||
await self._initiate_conversation()
|
||||
# 更新时间,避免连续触发
|
||||
await self.update_last_message_time()
|
||||
|
||||
# 等待下一次检查
|
||||
await asyncio.sleep(self.idle_check_interval)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消")
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}")
|
||||
# 尝试重新启动检测循环
|
||||
if self._running:
|
||||
logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测")
|
||||
self._task = asyncio.create_task(self._check_idle_loop())
|
||||
|
||||
async def _initiate_conversation(self) -> None:
|
||||
"""生成并发送主动对话内容
|
||||
|
||||
获取聊天历史记录,使用LLM生成合适的开场白,然后发送消息。
|
||||
"""
|
||||
try:
|
||||
# 获取聊天历史记录,用于生成更合适的开场白
|
||||
messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息
|
||||
chat_history_text = await build_readable_messages(
|
||||
messages,
|
||||
replace_bot_name=True,
|
||||
merge_messages=False,
|
||||
timestamp_mode="relative",
|
||||
read_mark=0.0,
|
||||
)
|
||||
|
||||
# 构建提示词
|
||||
prompt = f"""{self.personality_info}。你的名字是{self.name}。
|
||||
你正在与用户{self.private_name}进行QQ私聊,
|
||||
但已经有一段时间没有对话了。
|
||||
你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。
|
||||
请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。
|
||||
这条消息应该能够引起用户的兴趣,重新开始对话。
|
||||
最近的对话历史(可能已经过去了很久):
|
||||
{chat_history_text}
|
||||
请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。
|
||||
消息内容尽量简短,不要超过20个字,不要添加任何表情符号。
|
||||
"""
|
||||
|
||||
# 尝试生成回复,添加超时处理
|
||||
try:
|
||||
content, _ = await asyncio.wait_for(
|
||||
self.llm.generate_response_async(prompt),
|
||||
timeout=30 # 30秒超时
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时")
|
||||
return
|
||||
except Exception as llm_err:
|
||||
logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}")
|
||||
return
|
||||
|
||||
# 清理结果
|
||||
content = content.strip()
|
||||
content = content.strip('"\'')
|
||||
|
||||
if not content:
|
||||
logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空")
|
||||
return
|
||||
|
||||
# 统一错误处理,从这里开始所有操作都在同一个try-except块中
|
||||
logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送")
|
||||
|
||||
from .pfc_manager import PFCManager
|
||||
from src.plugins.chat.chat_stream import chat_manager
|
||||
|
||||
# 获取当前实例
|
||||
pfc_manager = PFCManager.get_instance()
|
||||
|
||||
# 结束当前对话实例(如果存在)
|
||||
current_conversation = await pfc_manager.get_conversation(self.stream_id)
|
||||
if current_conversation:
|
||||
logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例")
|
||||
try:
|
||||
await current_conversation.stop()
|
||||
await pfc_manager.remove_conversation(self.stream_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例")
|
||||
|
||||
# 创建新的对话实例
|
||||
logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息")
|
||||
new_conversation = None
|
||||
try:
|
||||
new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name)
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}")
|
||||
return
|
||||
|
||||
# 确保新对话实例已初始化完成
|
||||
chat_stream = await self._get_chat_stream(new_conversation)
|
||||
if not chat_stream:
|
||||
logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息")
|
||||
return
|
||||
|
||||
# 发送消息
|
||||
try:
|
||||
await self.message_sender.send_message(
|
||||
chat_stream=chat_stream,
|
||||
content=content,
|
||||
reply_to_message=None
|
||||
)
|
||||
|
||||
# 更新空闲会话启动器的最后消息时间
|
||||
await self.update_last_message_time()
|
||||
|
||||
# 如果新对话实例有一个聊天观察者,请触发更新
|
||||
if new_conversation and hasattr(new_conversation, 'chat_observer'):
|
||||
logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新")
|
||||
config_reload_counter = (config_reload_counter + 1) % config_reload_interval
|
||||
if config_reload_counter == 0: self.reload_config()
|
||||
if not self.enabled: continue
|
||||
current_time = time.time()
|
||||
async with self._lock: last_msg_time = self.last_message_time; threshold = self.actual_idle_threshold
|
||||
idle_time = current_time - last_msg_time
|
||||
if idle_time >= threshold:
|
||||
logger.info(f"[私聊][{self.private_name}] 检测到长时间({idle_time:.0f}秒 >= {threshold}秒)无对话,尝试主动发起聊天")
|
||||
try:
|
||||
new_conversation.chat_observer.trigger_update()
|
||||
except Exception as e:
|
||||
logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}")
|
||||
|
||||
logger.success(f"[私聊][{self.private_name}]成功主动发起对话: {content}")
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
# 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃
|
||||
logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}")
|
||||
|
||||
async def _get_chat_stream(self, conversation: Optional['Conversation'] = None) -> Optional[ChatStream]:
|
||||
"""获取可用的聊天流
|
||||
|
||||
尝试多种方式获取聊天流:
|
||||
1. 从传入的对话实例中获取
|
||||
2. 从全局聊天管理器中获取
|
||||
3. 创建一个新的聊天流
|
||||
|
||||
Args:
|
||||
conversation: 对话实例,可以为None
|
||||
|
||||
Returns:
|
||||
Optional[ChatStream]: 如果成功获取则返回聊天流,否则返回None
|
||||
"""
|
||||
chat_stream = None
|
||||
|
||||
# 1. 尝试从对话实例获取
|
||||
if conversation and hasattr(conversation, 'should_continue'):
|
||||
# 等待一小段时间,确保初始化完成
|
||||
retry_count = 0
|
||||
max_retries = 10
|
||||
while not conversation.should_continue and retry_count < max_retries:
|
||||
await asyncio.sleep(0.5)
|
||||
retry_count += 1
|
||||
logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}")
|
||||
|
||||
if not conversation.should_continue:
|
||||
logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流")
|
||||
|
||||
# 尝试使用对话实例的聊天流
|
||||
if hasattr(conversation, 'chat_stream') and conversation.chat_stream:
|
||||
logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流")
|
||||
return conversation.chat_stream
|
||||
|
||||
# 2. 尝试从聊天管理器获取
|
||||
from src.plugins.chat.chat_stream import chat_manager
|
||||
await self._initiate_conversation()
|
||||
await self.update_last_message_time() # 主动发起后立即更新时间
|
||||
except Exception as initiate_err:
|
||||
logger.error(f"[私聊][{self.private_name}] 主动发起对话过程中出错: {initiate_err}\n{traceback.format_exc()}")
|
||||
await self.update_last_message_time() # 即使出错也更新时间
|
||||
except asyncio.CancelledError: logger.debug(f"[私聊][{self.private_name}] 空闲对话检测任务被取消")
|
||||
except Exception as e: logger.error(f"[私聊][{self.private_name}] 空闲对话检测循环发生未预期错误: {str(e)}\n{traceback.format_exc()}")
|
||||
|
||||
async def _initiate_conversation(self) -> None:
|
||||
"""生成并发送主动对话内容 (修复循环导入)"""
|
||||
# --- 将 PFCManager 的导入移到这里 ---
|
||||
from .pfc_manager import PFCManager
|
||||
# 仅在需要时导入类型提示,避免运行时错误
|
||||
if TYPE_CHECKING:
|
||||
from .conversation import Conversation
|
||||
|
||||
pfc_manager: 'PFCManager' = PFCManager.get_instance() # 获取管理器实例
|
||||
conversation: Optional['Conversation'] = None
|
||||
chat_stream: Optional[ChatStream] = None
|
||||
|
||||
try:
|
||||
logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流")
|
||||
chat_stream = chat_manager.get_stream(self.stream_id)
|
||||
if chat_stream:
|
||||
return chat_stream
|
||||
# --- 1. 获取或创建 Conversation 实例 ---
|
||||
logger.debug(f"[私聊][{self.private_name}] 尝试获取或创建 Conversation 实例...")
|
||||
conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name)
|
||||
if not conversation or not conversation._initialized:
|
||||
logger.error(f"[私聊][{self.private_name}] 无法获取或创建有效的 Conversation 实例,取消主动发起对话。")
|
||||
return
|
||||
chat_stream = conversation.chat_stream
|
||||
if not chat_stream:
|
||||
logger.error(f"[私聊][{self.private_name}] 无法从 Conversation 实例获取 ChatStream,取消主动发起对话。")
|
||||
return
|
||||
|
||||
# --- 2. 获取聊天历史用于生成 ---
|
||||
messages: List[Dict[str, Any]] = []
|
||||
chat_history_text = "最近没有聊天记录。"
|
||||
if conversation.observation_info:
|
||||
messages = conversation.observation_info.chat_history[-12:]
|
||||
if messages:
|
||||
chat_history_text = await build_readable_messages(messages, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0)
|
||||
else: logger.warning(f"[私聊][{self.private_name}] Conversation 实例缺少 ObservationInfo,无法获取准确聊天记录。")
|
||||
|
||||
# --- 3. 构建 Prompt 并生成内容 ---
|
||||
prompt = f"""{self.personality_info}。你的名字是{self.name}。
|
||||
你正在与用户 {self.private_name} 进行QQ私聊, 但已经有一段时间没有对话了。
|
||||
你想要主动发起一个友好的对话,可以说说自己在做的事情或者询问对方在做什么。
|
||||
请基于以下之前的对话历史,生成一条自然、友好、符合你个性的主动对话消息。
|
||||
这条消息应该能够引起用户的兴趣,重新开始对话。
|
||||
|
||||
最近的对话历史(可能已经过去了很久):
|
||||
{chat_history_text}
|
||||
|
||||
请直接输出一条消息,不要有任何额外的解释或引导文字。消息要简短自然,就像是在日常聊天中的开场白。
|
||||
消息内容尽量简短,不要超过30个字。可以适当添加符合你人设的语气词或表情符号(如果合适)。
|
||||
"""
|
||||
# logger.debug(f"[私聊][{self.private_name}] 发送到 LLM 的主动对话 Prompt: \n{prompt}") # 日志可能过长
|
||||
content = ""
|
||||
try:
|
||||
llm_response, _ = await asyncio.wait_for(self.llm.generate_response_async(prompt), timeout=30)
|
||||
content = llm_response.strip().strip('"\'')
|
||||
logger.debug(f"[私聊][{self.private_name}] LLM 生成的主动对话内容: {content}")
|
||||
except asyncio.TimeoutError: logger.error(f"[私聊][{self.private_name}] 生成主动对话内容超时"); return
|
||||
except Exception as llm_err: logger.error(f"[私聊][{self.private_name}] 生成主动对话内容失败: {str(llm_err)}"); return
|
||||
if not content: logger.error(f"[私聊][{self.private_name}] 生成的主动对话内容为空"); return
|
||||
|
||||
# --- 4. 发送消息 ---
|
||||
logger.info(f"[私聊][{self.private_name}] 准备发送主动对话消息: {content}")
|
||||
await self.message_sender.send_message(chat_stream=chat_stream, content=content, reply_to_message=None)
|
||||
|
||||
# --- 5. 发送成功后的处理 ---
|
||||
logger.success(f"[私聊][{self.private_name}] 成功主动发起对话: {content}")
|
||||
# 更新时间戳的操作移至外层 _check_idle_loop
|
||||
# 触发 ChatObserver 更新
|
||||
if conversation.chat_observer:
|
||||
logger.debug(f"[私聊][{self.private_name}] 触发 ChatObserver 更新...")
|
||||
conversation.chat_observer.trigger_update()
|
||||
else: logger.warning(f"[私聊][{self.private_name}] Conversation 实例缺少 ChatObserver,无法触发更新。")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}")
|
||||
|
||||
# 3. 创建新的聊天流
|
||||
try:
|
||||
logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流")
|
||||
# 创建用户信息对象
|
||||
user_info = UserInfo(
|
||||
user_id=global_config.BOT_QQ,
|
||||
user_nickname=global_config.BOT_NICKNAME,
|
||||
platform="qq"
|
||||
)
|
||||
# 创建聊天流
|
||||
return ChatStream(self.stream_id, "qq", user_info)
|
||||
except Exception as e:
|
||||
logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}")
|
||||
return None
|
||||
logger.error(f"[私聊][{self.private_name}] 主动发起对话的整体流程出错: {str(e)}\n{traceback.format_exc()}")
|
||||
|
|
@ -36,11 +36,11 @@ class PFCManager:
|
|||
await asyncio.sleep(0.5) # 短暂等待,让初始化有机会完成
|
||||
# 再次检查实例是否存在
|
||||
if stream_id in self._instances and self._instances[stream_id]._initialized:
|
||||
logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}")
|
||||
return self._instances[stream_id]
|
||||
logger.debug(f"[私聊][{private_name}] 初始化已完成,返回现有实例: {stream_id}")
|
||||
return self._instances[stream_id]
|
||||
else:
|
||||
logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。")
|
||||
return None # 避免返回未完成的实例
|
||||
logger.warning(f"[私聊][{private_name}] 等待后实例仍未初始化完成或不存在。")
|
||||
return None # 避免返回未完成的实例
|
||||
|
||||
# 检查是否已有活动实例
|
||||
if stream_id in self._instances:
|
||||
|
|
@ -61,8 +61,10 @@ class PFCManager:
|
|||
logger.warning(f"[私聊][{private_name}] 发现无效或已停止的旧实例,清理并重新创建: {stream_id}")
|
||||
await self._cleanup_conversation(instance)
|
||||
# 从字典中移除,确保下面能创建新的
|
||||
if stream_id in self._instances: del self._instances[stream_id]
|
||||
if stream_id in self._initializing: del self._initializing[stream_id]
|
||||
if stream_id in self._instances:
|
||||
del self._instances[stream_id]
|
||||
if stream_id in self._initializing:
|
||||
del self._initializing[stream_id]
|
||||
|
||||
|
||||
# --- 创建并初始化新实例 ---
|
||||
|
|
@ -88,7 +90,8 @@ class PFCManager:
|
|||
logger.error(f"[私聊][{private_name}] 初始化未成功完成,无法启动实例 {stream_id}。")
|
||||
# 清理可能部分创建的实例
|
||||
await self._cleanup_conversation(conversation_instance)
|
||||
if stream_id in self._instances: del self._instances[stream_id]
|
||||
if stream_id in self._instances:
|
||||
del self._instances[stream_id]
|
||||
conversation_instance = None # 返回 None 表示失败
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -97,13 +100,14 @@ class PFCManager:
|
|||
# 确保清理
|
||||
if conversation_instance:
|
||||
await self._cleanup_conversation(conversation_instance)
|
||||
if stream_id in self._instances: del self._instances[stream_id]
|
||||
if stream_id in self._instances:
|
||||
del self._instances[stream_id]
|
||||
conversation_instance = None # 返回 None
|
||||
|
||||
finally:
|
||||
# 确保初始化标记被清除
|
||||
if stream_id in self._initializing:
|
||||
self._initializing[stream_id] = False
|
||||
if stream_id in self._initializing:
|
||||
self._initializing[stream_id] = False
|
||||
|
||||
return conversation_instance
|
||||
|
||||
|
|
@ -116,9 +120,9 @@ class PFCManager:
|
|||
await conversation._initialize() # 调用实例自身的初始化方法
|
||||
# 注意:初始化成功与否由 conversation._initialized 和 conversation.should_continue 标志决定
|
||||
if conversation._initialized:
|
||||
logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}")
|
||||
logger.info(f"[私聊][{private_name}] conversation._initialize() 调用完成,实例标记为已初始化: {stream_id}")
|
||||
else:
|
||||
logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}")
|
||||
logger.warning(f"[私聊][{private_name}] conversation._initialize() 调用完成,但实例未成功标记为已初始化: {stream_id}")
|
||||
|
||||
except Exception as e:
|
||||
# _initialize 内部应该处理自己的异常,但这里也捕获以防万一
|
||||
|
|
@ -131,7 +135,8 @@ class PFCManager:
|
|||
|
||||
async def _cleanup_conversation(self, conversation: Conversation):
|
||||
"""清理会话实例的资源"""
|
||||
if not conversation: return
|
||||
if not conversation:
|
||||
return
|
||||
stream_id = conversation.stream_id
|
||||
private_name = conversation.private_name
|
||||
logger.info(f"[私聊][{private_name}] 开始清理会话实例资源: {stream_id}")
|
||||
|
|
@ -140,13 +145,13 @@ class PFCManager:
|
|||
if hasattr(conversation, 'stop') and callable(conversation.stop):
|
||||
await conversation.stop() # stop 方法应处理内部组件的停止
|
||||
else:
|
||||
logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。")
|
||||
# 尝试手动停止已知组件 (作为后备)
|
||||
if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter:
|
||||
conversation.idle_conversation_starter.stop()
|
||||
if hasattr(conversation, 'observation_info') and conversation.observation_info:
|
||||
conversation.observation_info.unbind_from_chat_observer()
|
||||
# ChatObserver 是单例,不在此处停止
|
||||
logger.warning(f"[私聊][{private_name}] Conversation 对象缺少 stop 方法,可能无法完全清理资源。")
|
||||
# 尝试手动停止已知组件 (作为后备)
|
||||
if hasattr(conversation, 'idle_conversation_starter') and conversation.idle_conversation_starter:
|
||||
conversation.idle_conversation_starter.stop()
|
||||
if hasattr(conversation, 'observation_info') and conversation.observation_info:
|
||||
conversation.observation_info.unbind_from_chat_observer()
|
||||
# ChatObserver 是单例,不在此处停止
|
||||
|
||||
logger.info(f"[私聊][{private_name}] 会话实例 {stream_id} 资源已清理")
|
||||
except Exception as e:
|
||||
|
|
@ -157,12 +162,12 @@ class PFCManager:
|
|||
"""获取已存在的会话实例 (只读)"""
|
||||
instance = self._instances.get(stream_id)
|
||||
if instance and instance._initialized and instance.should_continue:
|
||||
# 检查忽略状态
|
||||
if (hasattr(instance, "ignore_until_timestamp") and
|
||||
instance.ignore_until_timestamp and
|
||||
time.time() < instance.ignore_until_timestamp):
|
||||
return None # 忽略期间不返回
|
||||
return instance
|
||||
# 检查忽略状态
|
||||
if (hasattr(instance, "ignore_until_timestamp") and
|
||||
instance.ignore_until_timestamp and
|
||||
time.time() < instance.ignore_until_timestamp):
|
||||
return None # 忽略期间不返回
|
||||
return instance
|
||||
return None # 不存在或无效则返回 None
|
||||
|
||||
async def remove_conversation(self, stream_id: str):
|
||||
|
|
@ -173,7 +178,8 @@ class PFCManager:
|
|||
try:
|
||||
# 先从字典中移除引用,防止新的请求获取到正在清理的实例
|
||||
del self._instances[stream_id]
|
||||
if stream_id in self._initializing: del self._initializing[stream_id]
|
||||
if stream_id in self._initializing:
|
||||
del self._initializing[stream_id]
|
||||
# 清理资源
|
||||
await self._cleanup_conversation(instance_to_remove)
|
||||
logger.info(f"[管理器] 会话实例 {stream_id} 已成功移除并清理")
|
||||
|
|
|
|||
Loading…
Reference in New Issue