Merge branch 'PFC-test' of https://github.com/smartmita/MaiBot into G-Test

pull/937/head
Bakadax 2025-05-09 18:18:05 +08:00
commit e0280ff20a
19 changed files with 437 additions and 1482 deletions

View File

@ -278,14 +278,18 @@ class BotConfig:
talk_allowed_private = set()
enable_pfc_chatting: bool = False # 是否启用PFC聊天
enable_pfc_reply_checker: bool = True # 是否开启PFC回复检查
# idle_conversation
enable_idle_conversation: bool = False # 是否启用 pfc 主动发言
idle_check_interval: int = 10 # 检查间隔10分钟检查一次
min_idle_time: int = 7200 # 最短无活动时间2小时 (7200秒)
max_idle_time: int = 18000 # 最长无活动时间5小时 (18000秒)
pfc_message_buffer_size: int = (
2 # PFC 聊天消息缓冲数量,有利于使聊天节奏更加紧凑流畅,请根据实际 LLM 响应速度进行调整默认2条
)
api_polling_max_retries: int = 3 # 神秘小功能
# idle_chat
enable_idle_chat: bool = False # 是否启用 pfc 主动发言
idle_check_interval: int = 10 # 检查间隔10分钟检查一次
min_cooldown: int = 7200 # 最短冷却时间2小时 (7200秒)
max_cooldown: int = 18000 # 最长冷却时间5小时 (18000秒)
# Group Nickname
enable_nickname_mapping: bool = False # 绰号映射功能总开关
max_nicknames_in_prompt: int = 10 # Prompt 中最多注入的绰号数量
@ -721,18 +725,17 @@ class BotConfig:
"enable_pfc_reply_checker", config.enable_pfc_reply_checker
)
logger.info(f"PFC Reply Checker 状态: {'启用' if config.enable_pfc_reply_checker else '关闭'}")
config.pfc_message_buffer_size = experimental_config.get(
"pfc_message_buffer_size", config.pfc_message_buffer_size
)
def idle_conversation(parent: dict):
idle_conversation_config = parent["idle_conversation"]
def idle_chat(parent: dict):
idle_chat_config = parent["idle_chat"]
if config.INNER_VERSION in SpecifierSet(">=1.6.1.6"):
config.enable_idle_conversation = idle_conversation_config.get(
"enable_idle_conversation", config.enable_idle_conversation
)
config.idle_check_interval = idle_conversation_config.get(
"idle_check_interval", config.idle_check_interval
)
config.min_idle_time = idle_conversation_config.get("min_idle_time", config.min_idle_time)
config.max_idle_time = idle_conversation_config.get("max_idle_time", config.max_idle_time)
config.enable_idle_chat = idle_chat_config.get("enable_idle_chat", config.enable_idle_chat)
config.idle_check_interval = idle_chat_config.get("idle_check_interval", config.idle_check_interval)
config.min_cooldown = idle_chat_config.get("min_cooldown", config.min_cooldown)
config.max_cooldown = idle_chat_config.get("max_cooldown", config.max_cooldown)
# 版本表达式:>=1.0.0,<2.0.0
# 允许字段func: method, support: str, notice: str, necessary: bool
@ -768,7 +771,7 @@ class BotConfig:
"normal_chat": {"func": normal_chat, "support": ">=1.6.0", "necessary": False},
"focus_chat": {"func": focus_chat, "support": ">=1.6.0", "necessary": False},
"group_nickname": {"func": group_nickname, "support": ">=1.6.1.1", "necessary": False},
"idle_conversation": {"func": idle_conversation, "support": ">=1.6.1.6", "necessary": False},
"idle_chat": {"func": idle_chat, "support": ">=1.6.1.6", "necessary": False},
}
# 原地修改,将 字符串版本表达式 转换成 版本对象

View File

@ -3,18 +3,8 @@ PFC_idle 包 - 用于空闲时主动聊天的功能模块
该包包含以下主要组件
- IdleChat: 根据关系和活跃度进行智能主动聊天
- IdleChatManager: 管理多个聊天实例的空闲状态
- IdleConversation: 处理与空闲聊天相关的功能与主Conversation类解耦
"""
from .idle_chat import IdleChat
from .idle_chat_manager import IdleChatManager
from .idle_conversation import IdleConversation, get_idle_conversation_instance, initialize_idle_conversation
__all__ = [
"IdleChat",
"IdleChatManager",
"IdleConversation",
"get_idle_conversation_instance",
"initialize_idle_conversation",
]
__all__ = ["IdleChat"]

View File

@ -7,10 +7,12 @@ from datetime import datetime
from src.common.logger_manager import get_logger
from src.config.config import global_config
from src.plugins.models.utils_model import LLMRequest
from src.plugins.utils.prompt_builder import global_prompt_manager
# from src.plugins.utils.prompt_builder import global_prompt_manager
from src.plugins.person_info.person_info import person_info_manager
from src.plugins.utils.chat_message_builder import build_readable_messages
from ...schedule.schedule_generator import bot_schedule
# from ...schedule.schedule_generator import bot_schedule
from ..chat_observer import ChatObserver
from ..message_sender import DirectMessageSender
from src.plugins.chat.chat_stream import ChatStream
@ -144,14 +146,11 @@ class IdleChat:
self._task: Optional[asyncio.Task] = None
# 配置参数 - 从global_config加载
self.min_cooldown = getattr(
global_config, "MIN_IDLE_TIME", 7200
) # 最短冷却时间默认2小时建议修改长一点你也不希望你的bot一直骚扰你吧
self.max_cooldown = getattr(global_config, "MAX_IDLE_TIME", 14400) # 最长冷却时间默认4小时
self.min_idle_time = getattr(global_config, "MIN_IDLE_TIME", 3600)
self.check_interval = getattr(global_config, "IDLE_CHECK_INTERVAL", 600) # 检查间隔默认10分钟
self.active_hours_start = 6 # 活动开始时间
self.active_hours_end = 24 # 活动结束时间
self.min_cooldown = global_config.min_cooldown # 最短冷却时间默认2小时
self.max_cooldown = global_config.max_cooldown # 最长冷却时间默认5小时
self.check_interval = global_config.idle_check_interval * 60 # 检查间隔默认10分钟转换为秒
self.active_hours_start = 7 # 活动开始时间
self.active_hours_end = 23 # 活动结束时间
# 关系值相关
self.base_trigger_probability = 0.3 # 基础触发概率
@ -160,8 +159,8 @@ class IdleChat:
def start(self) -> None:
"""启动主动聊天检测"""
# 检查是否启用了主动聊天功能
if not getattr(global_config, "ENABLE_IDLE_CONVERSATION", False):
logger.info(f"[私聊][{self.private_name}]主动聊天功能已禁用配置ENABLE_IDLE_CONVERSATION=False")
if not global_config.enable_idle_chat:
logger.info(f"[私聊][{self.private_name}]主动聊天功能已禁用配置ENABLE_IDLE_CHAT=False")
return
if self._running:
@ -353,7 +352,7 @@ class IdleChat:
try:
while self._running:
# 检查是否启用了主动聊天功能
if not getattr(global_config, "ENABLE_IDLE_CONVERSATION", False):
if not global_config.enable_idle_chat:
# 如果禁用了功能,等待一段时间后再次检查配置
await asyncio.sleep(60) # 每分钟检查一次配置变更
continue
@ -488,19 +487,20 @@ class IdleChat:
if "" in full_relationship_text:
relationship_description = full_relationship_text.split("")[1].replace("", "")
if global_config.ENABLE_SCHEDULE_GEN:
schedule_prompt = await global_prompt_manager.format_prompt(
"schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False)
)
else:
schedule_prompt = ""
# 暂不使用
# if global_config.ENABLE_SCHEDULE_GEN:
# schedule_prompt = await global_prompt_manager.format_prompt(
# "schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False)
# )
# else:
# schedule_prompt = ""
# 构建提示词
# 构建提示词,暂存废弃部分这是你的日程{schedule_prompt}
current_time = datetime.now().strftime("%H:%M")
prompt = f"""你是{global_config.BOT_NICKNAME}
你正在与用户{self.private_name}进行QQ私聊你们的关系是{relationship_description}
现在时间{current_time}
这是你的日程{schedule_prompt}
你想要主动发起对话
请基于以下之前的对话历史生成一条自然友好符合关系程度的主动对话消息
这条消息应能够引起用户的兴趣重新开始对话

View File

@ -1,183 +0,0 @@
from typing import Dict, Optional
import asyncio
from src.common.logger_manager import get_logger
from .idle_chat import IdleChat
import traceback
logger = get_logger("pfc_idle_chat_manager")
class IdleChatManager:
"""空闲聊天管理器
用于管理所有私聊用户的空闲聊天实例
采用单例模式确保全局只有一个管理器实例
"""
_instance: Optional["IdleChatManager"] = None
_lock: asyncio.Lock = asyncio.Lock()
def __init__(self):
"""初始化空闲聊天管理器"""
self._idle_chats: Dict[str, IdleChat] = {} # stream_id -> IdleChat
self._active_conversations_count: Dict[str, int] = {} # stream_id -> count
@classmethod
def get_instance(cls) -> "IdleChatManager":
"""获取管理器单例 (同步版本)
Returns:
IdleChatManager: 管理器实例
"""
if not cls._instance:
# 在同步环境中创建实例
cls._instance = cls()
return cls._instance
@classmethod
async def get_instance_async(cls) -> "IdleChatManager":
"""获取管理器单例 (异步版本)
Returns:
IdleChatManager: 管理器实例
"""
if not cls._instance:
async with cls._lock:
if not cls._instance:
cls._instance = cls()
return cls._instance
async def get_or_create_idle_chat(self, stream_id: str, private_name: str) -> IdleChat:
"""获取或创建空闲聊天实例
Args:
stream_id: 聊天流ID
private_name: 私聊用户名称
Returns:
IdleChat: 空闲聊天实例
"""
if stream_id not in self._idle_chats:
idle_chat = IdleChat(stream_id, private_name)
self._idle_chats[stream_id] = idle_chat
# 初始化活跃对话计数
if stream_id not in self._active_conversations_count:
self._active_conversations_count[stream_id] = 0
idle_chat.start() # 启动空闲检测
logger.info(f"[私聊][{private_name}]创建并启动新的空闲聊天实例")
return self._idle_chats[stream_id]
async def remove_idle_chat(self, stream_id: str) -> None:
"""移除空闲聊天实例
Args:
stream_id: 聊天流ID
"""
if stream_id in self._idle_chats:
idle_chat = self._idle_chats[stream_id]
idle_chat.stop() # 停止空闲检测
del self._idle_chats[stream_id]
if stream_id in self._active_conversations_count:
del self._active_conversations_count[stream_id]
logger.info(f"[私聊][{idle_chat.private_name}]移除空闲聊天实例")
async def notify_conversation_start(self, stream_id: str) -> None:
"""通知对话开始
Args:
stream_id: 聊天流ID
"""
try:
if stream_id not in self._idle_chats:
logger.warning(f"对话开始通知: {stream_id} 没有对应的IdleChat实例将创建一个")
# 从stream_id尝试提取private_name
private_name = stream_id
if stream_id.startswith("private_"):
parts = stream_id.split("_")
if len(parts) >= 2:
private_name = parts[1] # 取第二部分作为名称
await self.get_or_create_idle_chat(stream_id, private_name)
if stream_id not in self._active_conversations_count:
self._active_conversations_count[stream_id] = 0
# 增加计数前记录当前值,用于日志
old_count = self._active_conversations_count[stream_id]
self._active_conversations_count[stream_id] += 1
new_count = self._active_conversations_count[stream_id]
# 确保IdleChat实例存在
idle_chat = self._idle_chats.get(stream_id)
if idle_chat:
await idle_chat.increment_active_instances()
logger.debug(f"对话开始通知: {stream_id}, 计数从{old_count}增加到{new_count}")
else:
logger.error(f"对话开始通知: {stream_id}, 计数增加但IdleChat不存在! 计数:{old_count}->{new_count}")
except Exception as e:
logger.error(f"对话开始通知处理失败: {stream_id}, 错误: {e}")
logger.error(traceback.format_exc())
async def notify_conversation_end(self, stream_id: str) -> None:
"""通知对话结束
Args:
stream_id: 聊天流ID
"""
try:
# 记录当前计数用于日志
old_count = self._active_conversations_count.get(stream_id, 0)
# 安全减少计数,避免负数
if stream_id in self._active_conversations_count and self._active_conversations_count[stream_id] > 0:
self._active_conversations_count[stream_id] -= 1
else:
# 如果计数已经为0或不存在设置为0
self._active_conversations_count[stream_id] = 0
new_count = self._active_conversations_count.get(stream_id, 0)
# 确保IdleChat实例存在
idle_chat = self._idle_chats.get(stream_id)
if idle_chat:
await idle_chat.decrement_active_instances()
logger.debug(f"对话结束通知: {stream_id}, 计数从{old_count}减少到{new_count}")
else:
logger.warning(f"对话结束通知: {stream_id}, 计数减少但IdleChat不存在! 计数:{old_count}->{new_count}")
# 检查是否所有对话都结束了,帮助调试
all_counts = sum(self._active_conversations_count.values())
if all_counts == 0:
logger.info("所有对话实例都已结束当前总活跃计数为0")
except Exception as e:
logger.error(f"对话结束通知处理失败: {stream_id}, 错误: {e}")
logger.error(traceback.format_exc())
def get_idle_chat(self, stream_id: str) -> Optional[IdleChat]:
"""获取空闲聊天实例
Args:
stream_id: 聊天流ID
Returns:
Optional[IdleChat]: 空闲聊天实例如果不存在则返回None
"""
return self._idle_chats.get(stream_id)
def get_active_conversations_count(self, stream_id: str) -> int:
"""获取指定流的活跃对话计数
Args:
stream_id: 聊天流ID
Returns:
int: 活跃对话计数
"""
return self._active_conversations_count.get(stream_id, 0)
def get_all_active_conversations_count(self) -> int:
"""获取所有活跃对话总计数
Returns:
int: 活跃对话总计数
"""
return sum(self._active_conversations_count.values())

View File

@ -1,513 +0,0 @@
import traceback
import asyncio
from typing import Optional, Dict
from src.common.logger_manager import get_logger
import time
logger = get_logger("pfc_idle_conversation")
class IdleConversation:
"""
处理Idle聊天相关的功能将这些功能从主Conversation类中分离出来
以减少代码量并方便维护
"""
def __init__(self):
"""初始化IdleConversation实例"""
self._idle_chat_manager = None
self._running = False
self._active_streams: Dict[str, bool] = {} # 跟踪活跃的流
self._monitor_task = None # 用于后台监控的任务
self._lock = asyncio.Lock() # 用于线程安全操作
self._initialization_in_progress = False # 防止并发初始化
async def initialize(self):
"""初始化Idle聊天管理器"""
# 防止并发初始化
if self._initialization_in_progress:
logger.debug("IdleConversation正在初始化中等待完成")
return False
if self._idle_chat_manager is not None:
logger.debug("IdleConversation已初始化无需重复操作")
return True
# 标记开始初始化
self._initialization_in_progress = True
try:
# 从PFCManager获取IdleChatManager实例
from ..pfc_manager import PFCManager
pfc_manager = PFCManager.get_instance()
self._idle_chat_manager = pfc_manager.get_idle_chat_manager()
logger.debug("IdleConversation初始化完成已获取IdleChatManager实例")
return True
except Exception as e:
logger.error(f"初始化IdleConversation时出错: {e}")
logger.error(traceback.format_exc())
return False
finally:
# 无论成功或失败,都清除初始化标志
self._initialization_in_progress = False
async def start(self):
"""启动IdleConversation创建后台监控任务"""
if self._running:
logger.debug("IdleConversation已经在运行")
return False
if not self._idle_chat_manager:
success = await self.initialize()
if not success:
logger.error("无法启动IdleConversation初始化失败")
return False
try:
self._running = True
# 创建后台监控任务使用try-except块来捕获可能的异常
try:
loop = asyncio.get_running_loop()
if loop.is_running():
self._monitor_task = asyncio.create_task(self._monitor_loop())
logger.info("IdleConversation启动成功后台监控任务已创建")
else:
logger.warning("事件循环不活跃,跳过监控任务创建")
except RuntimeError:
# 如果没有活跃的事件循环,记录警告但继续执行
logger.warning("没有活跃的事件循环IdleConversation将不会启动监控任务")
# 尽管没有监控任务但仍然将running设为True表示IdleConversation已启动
return True
except Exception as e:
self._running = False
logger.error(f"启动IdleConversation失败: {e}")
logger.error(traceback.format_exc())
return False
async def stop(self):
"""停止IdleConversation的后台任务"""
if not self._running:
return
self._running = False
if self._monitor_task and not self._monitor_task.done():
try:
self._monitor_task.cancel()
try:
await asyncio.wait_for(self._monitor_task, timeout=2.0)
except asyncio.TimeoutError:
logger.warning("停止IdleConversation监控任务超时")
except asyncio.CancelledError:
pass # 正常取消
except Exception as e:
logger.error(f"停止IdleConversation监控任务时出错: {e}")
logger.error(traceback.format_exc())
self._monitor_task = None
logger.info("IdleConversation已停止")
async def _monitor_loop(self):
"""后台监控循环,定期检查活跃的会话并执行必要的操作"""
try:
while self._running:
try:
# 同步活跃流计数到IdleChatManager
if self._idle_chat_manager:
await self._sync_active_streams_to_manager()
# 这里可以添加定期检查逻辑,如查询空闲状态等
active_count = len(self._active_streams)
logger.debug(f"IdleConversation监控中当前活跃流数量: {active_count}")
except Exception as e:
logger.error(f"IdleConversation监控循环出错: {e}")
logger.error(traceback.format_exc())
# 每30秒执行一次监控
await asyncio.sleep(30)
except asyncio.CancelledError:
logger.info("IdleConversation监控任务已取消")
except Exception as e:
logger.error(f"IdleConversation监控任务异常退出: {e}")
logger.error(traceback.format_exc())
self._running = False
async def _sync_active_streams_to_manager(self):
"""同步活跃流计数到IdleChatManager和IdleChat"""
try:
if not self._idle_chat_manager:
return
# 获取当前的活跃流列表
async with self._lock:
active_streams = list(self._active_streams.keys())
# 对每个活跃流确保IdleChatManager和IdleChat中的计数是正确的
for stream_id in active_streams:
# 获取当前IdleChatManager中的计数
manager_count = self._idle_chat_manager.get_active_conversations_count(stream_id)
# 由于我们的活跃流字典只记录是否活跃(值为True)所以计数应该是1
if manager_count != 1:
# 修正IdleChatManager中的计数
old_count = manager_count
self._idle_chat_manager._active_conversations_count[stream_id] = 1
logger.warning(f"同步调整IdleChatManager中的计数: stream_id={stream_id}, {old_count}->1")
# 同时修正IdleChat中的计数
idle_chat = self._idle_chat_manager.get_idle_chat(stream_id)
if idle_chat:
if getattr(idle_chat, "active_instances_count", 0) != 1:
old_count = getattr(idle_chat, "active_instances_count", 0)
idle_chat.active_instances_count = 1
logger.warning(f"同步调整IdleChat中的计数: stream_id={stream_id}, {old_count}->1")
# 检查IdleChatManager中有没有多余的计数(conversation中已不存在但manager中还有)
for stream_id, count in list(self._idle_chat_manager._active_conversations_count.items()):
if count > 0 and stream_id not in active_streams:
# 重置为0
self._idle_chat_manager._active_conversations_count[stream_id] = 0
logger.warning(f"重置IdleChatManager中的多余计数: stream_id={stream_id}, {count}->0")
# 同时修正IdleChat中的计数
idle_chat = self._idle_chat_manager.get_idle_chat(stream_id)
if idle_chat and getattr(idle_chat, "active_instances_count", 0) > 0:
old_count = getattr(idle_chat, "active_instances_count", 0)
idle_chat.active_instances_count = 0
logger.warning(f"同步重置IdleChat中的计数: stream_id={stream_id}, {old_count}->0")
# 日志记录同步结果
total_active = len(active_streams)
total_manager = sum(self._idle_chat_manager._active_conversations_count.values())
logger.debug(f"同步后的计数: IdleConversation活跃流={total_active}, IdleChatManager总计数={total_manager}")
except Exception as e:
logger.error(f"同步活跃流计数失败: {e}")
logger.error(traceback.format_exc())
async def get_or_create_idle_chat(self, stream_id: str, private_name: str):
"""
获取或创建IdleChat实例
Args:
stream_id: 聊天流ID
private_name: 私聊对象名称用于日志
Returns:
bool: 操作是否成功
"""
# 确保IdleConversation已启动
if not self._running:
await self.start()
if not self._idle_chat_manager:
# 如果尚未初始化,尝试初始化
success = await self.initialize()
if not success:
logger.warning(f"[私聊][{private_name}] 获取或创建IdleChat失败IdleChatManager未初始化")
return False
try:
# 创建IdleChat实例
_idle_chat = await self._idle_chat_manager.get_or_create_idle_chat(stream_id, private_name)
logger.debug(f"[私聊][{private_name}] 已创建或获取IdleChat实例")
return True
except Exception as e:
logger.warning(f"[私聊][{private_name}] 创建或获取IdleChat实例失败: {e}")
logger.warning(traceback.format_exc())
return False
async def notify_conversation_start(self, stream_id: str, private_name: str) -> bool:
"""
通知空闲聊天管理器对话开始
Args:
stream_id: 聊天流ID
private_name: 私聊对象名称用于日志
Returns:
bool: 通知是否成功
"""
try:
# 确保IdleConversation已启动
if not self._running:
success = await self.start()
if not success:
logger.warning(f"[私聊][{private_name}] 启动IdleConversation失败无法通知对话开始")
return False
if not self._idle_chat_manager:
# 如果尚未初始化,尝试初始化
success = await self.initialize()
if not success:
logger.warning(f"[私聊][{private_name}] 通知对话开始失败IdleChatManager未初始化")
return False
try:
# 确保IdleChat实例已创建 - 这是关键步骤要先创建IdleChat
await self.get_or_create_idle_chat(stream_id, private_name)
# 先记录活跃状态 - 这是权威源
async with self._lock:
self._active_streams[stream_id] = True
# 然后同步到IdleChatManager
if self._idle_chat_manager:
await self._idle_chat_manager.notify_conversation_start(stream_id)
logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话开始")
else:
logger.warning(f"[私聊][{private_name}] IdleChatManager不存在但已记录活跃状态")
# 立即进行一次同步,确保数据一致性
await self._sync_active_streams_to_manager()
return True
except Exception as e:
logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话开始失败: {e}")
logger.warning(traceback.format_exc())
# 即使通知失败,也应记录活跃状态
async with self._lock:
self._active_streams[stream_id] = True
return False
except Exception as outer_e:
logger.error(f"[私聊][{private_name}] 处理对话开始通知时发生严重错误: {outer_e}")
logger.error(traceback.format_exc())
return False
async def notify_conversation_end(self, stream_id: str, private_name: str) -> bool:
"""
通知空闲聊天管理器对话结束
Args:
stream_id: 聊天流ID
private_name: 私聊对象名称用于日志
Returns:
bool: 通知是否成功
"""
try:
# 先从自身的活跃流中移除 - 这是权威源
was_active = False
async with self._lock:
if stream_id in self._active_streams:
del self._active_streams[stream_id]
was_active = True
logger.debug(f"[私聊][{private_name}] 已从活跃流中移除 {stream_id}")
if not self._idle_chat_manager:
# 如果尚未初始化,尝试初始化
success = await self.initialize()
if not success:
logger.warning(f"[私聊][{private_name}] 通知对话结束失败IdleChatManager未初始化")
return False
try:
# 然后同步到IdleChatManager
if self._idle_chat_manager:
# 无论如何都尝试通知
await self._idle_chat_manager.notify_conversation_end(stream_id)
# 立即进行一次同步,确保数据一致性
await self._sync_active_streams_to_manager()
logger.info(f"[私聊][{private_name}] 已通知空闲聊天管理器对话结束")
# 检查当前活跃流数量
active_count = len(self._active_streams)
if active_count == 0:
logger.info(f"[私聊][{private_name}] 当前无活跃流,可能会触发主动聊天")
# 额外调用:如果实例存在且只有在确实移除了活跃流的情况下才触发检查
if was_active:
idle_chat = self._idle_chat_manager.get_idle_chat(stream_id)
if idle_chat:
# 直接触发IdleChat检查而不是等待下一个循环
logger.info(f"[私聊][{private_name}] 对话结束,手动触发一次主动聊天检查")
asyncio.create_task(self._trigger_idle_chat_check(idle_chat, stream_id, private_name))
return True
else:
logger.warning(f"[私聊][{private_name}] IdleChatManager不存在但已更新活跃状态")
return False
except Exception as e:
logger.warning(f"[私聊][{private_name}] 通知空闲聊天管理器对话结束失败: {e}")
logger.warning(traceback.format_exc())
return False
except Exception as outer_e:
logger.error(f"[私聊][{private_name}] 处理对话结束通知时发生严重错误: {outer_e}")
logger.error(traceback.format_exc())
return False
async def _trigger_idle_chat_check(self, idle_chat, stream_id: str, private_name: str):
"""在对话结束后手动触发一次IdleChat的检查"""
try:
# 确保活跃计数与IdleConversation一致
async with self._lock:
is_active_in_conversation = stream_id in self._active_streams
# 强制使IdleChat的计数与IdleConversation一致
if is_active_in_conversation:
# 如果在IdleConversation中是活跃的IdleChat的计数应该是1
if idle_chat.active_instances_count != 1:
old_count = idle_chat.active_instances_count
idle_chat.active_instances_count = 1
logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->1")
else:
# 如果在IdleConversation中不是活跃的IdleChat的计数应该是0
if idle_chat.active_instances_count != 0:
old_count = idle_chat.active_instances_count
idle_chat.active_instances_count = 0
logger.warning(f"[私聊][{private_name}] 修正IdleChat计数: {old_count}->0")
# 等待1秒让任何正在进行的处理完成
await asyncio.sleep(1)
# 只有当stream不再活跃时才触发检查
if not is_active_in_conversation:
# 尝试触发一次检查
if hasattr(idle_chat, "_should_trigger"):
should_trigger = await idle_chat._should_trigger()
logger.info(f"[私聊][{private_name}] 手动触发主动聊天检查结果: {should_trigger}")
# 如果应该触发直接调用_initiate_chat
if should_trigger and hasattr(idle_chat, "_initiate_chat"):
logger.info(f"[私聊][{private_name}] 手动触发主动聊天")
await idle_chat._initiate_chat()
# 更新最后触发时间
idle_chat.last_trigger_time = time.time()
else:
logger.warning(f"[私聊][{private_name}] IdleChat没有_should_trigger方法无法触发检查")
except Exception as e:
logger.error(f"[私聊][{private_name}] 手动触发主动聊天检查时出错: {e}")
logger.error(traceback.format_exc())
def is_stream_active(self, stream_id: str) -> bool:
"""检查指定的stream是否活跃"""
return stream_id in self._active_streams
def get_active_streams_count(self) -> int:
"""获取当前活跃的stream数量"""
return len(self._active_streams)
@property
def is_running(self) -> bool:
"""检查IdleConversation是否正在运行"""
return self._running
@property
def idle_chat_manager(self):
"""获取IdleChatManager实例"""
return self._idle_chat_manager
# 创建单例实例
_instance: Optional[IdleConversation] = None
_instance_lock = asyncio.Lock()
_initialization_in_progress = False # 防止并发初始化
async def initialize_idle_conversation() -> IdleConversation:
"""初始化并启动IdleConversation单例实例"""
global _initialization_in_progress
# 防止并发初始化
if _initialization_in_progress:
logger.debug("IdleConversation全局初始化正在进行中等待完成")
return get_idle_conversation_instance()
# 标记正在初始化
_initialization_in_progress = True
try:
instance = get_idle_conversation_instance()
# 如果实例已经在运行,避免重复初始化
if getattr(instance, "_running", False):
logger.debug("IdleConversation已在运行状态无需重新初始化")
_initialization_in_progress = False
return instance
# 初始化实例
success = await instance.initialize()
if not success:
logger.error("IdleConversation初始化失败")
_initialization_in_progress = False
return instance
# 启动实例
success = await instance.start()
if not success:
logger.error("IdleConversation启动失败")
else:
# 启动成功,进行初始检查
logger.info("IdleConversation启动成功执行初始化后检查")
# 这里可以添加一些启动后的检查,如果需要
# 创建一个异步任务,定期检查系统状态
asyncio.create_task(periodic_system_check(instance))
return instance
except Exception as e:
logger.error(f"初始化并启动IdleConversation时出错: {e}")
logger.error(traceback.format_exc())
# 重置标志,允许下次再试
_initialization_in_progress = False
return get_idle_conversation_instance() # 返回实例,即使初始化失败
finally:
# 清除初始化标志
_initialization_in_progress = False
async def periodic_system_check(instance: IdleConversation):
"""定期检查系统状态,确保主动聊天功能正常工作"""
try:
# 等待10秒让系统完全启动
await asyncio.sleep(10)
while getattr(instance, "_running", False):
try:
# 检查活跃流数量
active_streams_count = len(getattr(instance, "_active_streams", {}))
# 如果IdleChatManager存在检查其中的活跃对话计数
idle_chat_manager = getattr(instance, "_idle_chat_manager", None)
if idle_chat_manager and hasattr(idle_chat_manager, "get_all_active_conversations_count"):
manager_count = idle_chat_manager.get_all_active_conversations_count()
# 如果两者不一致,记录警告
if active_streams_count != manager_count:
logger.warning(
f"检测到计数不一致: IdleConversation记录的活跃流数量({active_streams_count}) 与 IdleChatManager记录的活跃对话数({manager_count})不匹配"
)
# 如果IdleChatManager记录的计数为0但自己的记录不为0进行修正
if manager_count == 0 and active_streams_count > 0:
logger.warning("检测到可能的计数错误尝试修正清空IdleConversation的活跃流记录")
async with instance._lock:
instance._active_streams.clear()
# 检查计数如果为0帮助日志输出
if active_streams_count == 0:
logger.debug("当前没有活跃的对话流,应该可以触发主动聊天")
except Exception as check_err:
logger.error(f"执行系统检查时出错: {check_err}")
logger.error(traceback.format_exc())
# 每60秒检查一次
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.debug("系统检查任务被取消")
except Exception as e:
logger.error(f"系统检查任务异常退出: {e}")
logger.error(traceback.format_exc())
def get_idle_conversation_instance() -> IdleConversation:
"""获取IdleConversation的单例实例"""
global _instance
if _instance is None:
_instance = IdleConversation()
return _instance

View File

@ -1,354 +0,0 @@
import time
import asyncio
import random
import traceback
from typing import TYPE_CHECKING, Optional
from src.common.logger_manager import get_logger
from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config
from src.plugins.chat.chat_stream import chat_manager, ChatStream
from src.individuality.individuality import Individuality
from src.plugins.utils.chat_message_builder import build_readable_messages
from maim_message import UserInfo
from ..chat_observer import ChatObserver
from ..message_sender import DirectMessageSender
# 导入富文本回溯,用于更好的错误展示
from rich.traceback import install
# 使用TYPE_CHECKING避免循环导入
if TYPE_CHECKING:
from ..conversation import Conversation
install(extra_lines=3)
# 获取当前模块的日志记录器
logger = get_logger("idle_conversation_starter")
class IdleConversationStarter:
"""长时间无对话主动发起对话的组件
该组件会在一段时间没有对话后自动生成一条消息发送给用户以保持对话的活跃度
时间阈值会在配置的最小和最大值之间随机选择每次发送消息后都会重置
"""
def __init__(self, stream_id: str, private_name: str):
"""初始化空闲对话启动器
Args:
stream_id: 聊天流ID
private_name: 私聊用户名称
"""
self.stream_id: str = stream_id
self.private_name: str = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.message_sender = DirectMessageSender(private_name)
# 添加异步锁,保护对共享变量的访问
self._lock: asyncio.Lock = asyncio.Lock()
# LLM请求对象用于生成主动对话内容
self.llm = LLMRequest(
model=global_config.llm_normal, temperature=0.8, max_tokens=500, request_type="idle_conversation_starter"
)
# 个性化信息
self.personality_info: str = Individuality.get_instance().get_prompt(x_person=2, level=3)
# 计算实际触发阈值在min和max之间随机
self.actual_idle_threshold: int = random.randint(global_config.min_idle_time, global_config.max_idle_time)
# 工作状态
self.last_message_time: float = time.time()
self._running: bool = False
self._task: Optional[asyncio.Task] = None
def start(self) -> None:
"""启动空闲对话检测
如果功能被禁用或已经在运行则不会启动
"""
# 如果功能被禁用,则不启动
if not global_config.enable_idle_conversation:
logger.info(f"[私聊][{self.private_name}]主动发起对话功能已禁用")
return
if self._running:
logger.debug(f"[私聊][{self.private_name}]主动发起对话功能已在运行中")
return
self._running = True
self._task = asyncio.create_task(self._check_idle_loop())
logger.info(f"[私聊][{self.private_name}]启动空闲对话检测,阈值设置为{self.actual_idle_threshold}")
def stop(self) -> None:
"""停止空闲对话检测
取消当前运行的任务并重置状态
"""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
self._task = None
logger.info(f"[私聊][{self.private_name}]停止空闲对话检测")
async def update_last_message_time(self, message_time: Optional[float] = None) -> None:
"""更新最后一条消息的时间
Args:
message_time: 消息时间戳如果为None则使用当前时间
"""
async with self._lock:
self.last_message_time = message_time or time.time()
# 重新随机化下一次触发的时间阈值
self.actual_idle_threshold = random.randint(global_config.min_idle_time, global_config.max_idle_time)
logger.debug(
f"[私聊][{self.private_name}]更新最后消息时间: {self.last_message_time},新阈值: {self.actual_idle_threshold}"
)
def reload_config(self) -> None:
"""重新加载配置
记录当前配置参数用于日志输出
"""
try:
logger.debug(
f"[私聊][{self.private_name}]重新加载主动对话配置: 启用={global_config.enable_idle_conversation}, 检查间隔={global_config.idle_check_interval}秒, 最短间隔={global_config.min_idle_time}秒, 最长间隔={global_config.max_idle_time}"
)
# 重新计算实际阈值
async def update_threshold():
async with self._lock:
self.actual_idle_threshold = random.randint(
global_config.min_idle_time, global_config.max_idle_time
)
logger.debug(f"[私聊][{self.private_name}]更新空闲检测阈值为: {self.actual_idle_threshold}")
# 创建一个任务来异步更新阈值
asyncio.create_task(update_threshold())
except Exception as e:
logger.error(f"[私聊][{self.private_name}]重新加载配置时出错: {str(e)}")
logger.error(traceback.format_exc())
async def _check_idle_loop(self) -> None:
"""检查空闲状态的循环
定期检查是否长时间无对话如果达到阈值则尝试主动发起对话
"""
try:
config_reload_counter = 0
config_reload_interval = 100 # 每100次检查重新加载一次配置
while self._running:
# 定期重新加载配置
config_reload_counter += 1
if config_reload_counter >= config_reload_interval:
self.reload_config()
config_reload_counter = 0
# 检查是否启用了主动对话功能
if not global_config.enable_idle_conversation:
# 如果禁用了功能,就等待一段时间后再次检查配置
await asyncio.sleep(global_config.idle_check_interval)
continue
# 使用锁保护对共享变量的读取
current_time = time.time()
async with self._lock:
idle_time = current_time - self.last_message_time
threshold = self.actual_idle_threshold
if idle_time >= threshold:
logger.info(f"[私聊][{self.private_name}]检测到长时间({idle_time:.0f}秒)无对话,尝试主动发起聊天")
await self._initiate_conversation()
# 更新时间,避免连续触发
await self.update_last_message_time()
# 等待下一次检查
await asyncio.sleep(global_config.idle_check_interval)
except asyncio.CancelledError:
logger.debug(f"[私聊][{self.private_name}]空闲对话检测任务被取消")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]空闲对话检测出错: {str(e)}")
logger.error(traceback.format_exc())
# 尝试重新启动检测循环
if self._running:
logger.info(f"[私聊][{self.private_name}]尝试重新启动空闲对话检测")
self._task = asyncio.create_task(self._check_idle_loop())
async def _initiate_conversation(self) -> None:
"""生成并发送主动对话内容
获取聊天历史记录使用LLM生成合适的开场白大概然后发送消息
"""
try:
# 获取聊天历史记录,用于生成更合适的开场白
messages = self.chat_observer.get_cached_messages(limit=12) # 获取最近12条消息
chat_history_text = await build_readable_messages(
messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
# 构建提示词
prompt = f"""{self.personality_info}。你的名字是{global_config.BOT_NICKNAME}
你正在与用户{self.private_name}进行QQ私聊,
但已经有一段时间没有对话了
你想要主动发起一个友好的对话可以说说自己在做的事情或者询问对方在做什么
请基于以下之前的对话历史生成一条自然友好符合你个性的主动对话消息
这条消息应该能够引起用户的兴趣重新开始对话
最近的对话历史可能已经过去了很久
{chat_history_text}
请直接输出一条消息不要有任何额外的解释或引导文字消息要简短自然就像是在日常聊天中的开场白
消息内容尽量简短,不要超过20个字,不要添加任何表情符号
"""
# 尝试生成回复,添加超时处理
try:
content, _ = await asyncio.wait_for(
self.llm.generate_response_async(prompt),
timeout=30, # 30秒超时
)
except asyncio.TimeoutError:
logger.error(f"[私聊][{self.private_name}]生成主动对话内容超时")
return
except Exception as llm_err:
logger.error(f"[私聊][{self.private_name}]生成主动对话内容失败: {str(llm_err)}")
logger.error(traceback.format_exc())
return
# 清理结果
content = content.strip()
content = content.strip("\"'")
if not content:
logger.error(f"[私聊][{self.private_name}]生成的主动对话内容为空")
return
# 统一错误处理从这里开始所有操作都在同一个try-except块中
logger.debug(f"[私聊][{self.private_name}]成功生成主动对话内容: {content},准备发送")
# 在函数内部导入PFCManager避免循环导入
from ..pfc_manager import PFCManager
# 获取当前实例 - 注意这是同步方法不需要await
pfc_manager = PFCManager.get_instance()
# 结束当前对话实例(如果存在)
current_conversation = await pfc_manager.get_conversation(self.stream_id)
if current_conversation:
logger.info(f"[私聊][{self.private_name}]结束当前对话实例,准备创建新实例")
try:
await current_conversation.stop()
await pfc_manager.remove_conversation(self.stream_id)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]结束当前对话实例时出错: {str(e)},继续创建新实例")
logger.warning(traceback.format_exc())
# 创建新的对话实例
logger.info(f"[私聊][{self.private_name}]创建新的对话实例以发送主动消息")
new_conversation = None
try:
new_conversation = await pfc_manager.get_or_create_conversation(self.stream_id, self.private_name)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]创建新对话实例失败: {str(e)}")
logger.error(traceback.format_exc())
return
# 确保新对话实例已初始化完成
chat_stream = await self._get_chat_stream(new_conversation)
if not chat_stream:
logger.error(f"[私聊][{self.private_name}]无法获取有效的聊天流,取消发送主动消息")
return
# 发送消息
try:
await self.message_sender.send_message(chat_stream=chat_stream, content=content, reply_to_message=None)
# 更新空闲会话启动器的最后消息时间
await self.update_last_message_time()
# 如果新对话实例有一个聊天观察者,请触发更新
if new_conversation and hasattr(new_conversation, "chat_observer"):
logger.info(f"[私聊][{self.private_name}]触发聊天观察者更新")
try:
new_conversation.chat_observer.trigger_update()
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]触发聊天观察者更新失败: {str(e)}")
logger.warning(traceback.format_exc())
logger.info(f"[私聊][{self.private_name}]成功主动发起对话: {content}")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送主动对话消息失败: {str(e)}")
logger.error(traceback.format_exc())
except Exception as e:
# 顶级异常处理,确保任何未捕获的异常都不会导致整个进程崩溃
logger.error(f"[私聊][{self.private_name}]主动发起对话过程中发生未预期的错误: {str(e)}")
logger.error(traceback.format_exc())
async def _get_chat_stream(self, conversation: Optional["Conversation"] = None) -> Optional[ChatStream]:
"""获取可用的聊天流
尝试多种方式获取聊天流
1. 从传入的对话实例中获取
2. 从全局聊天管理器中获取
3. 创建一个新的聊天流
Args:
conversation: 对话实例可以为None
Returns:
Optional[ChatStream]: 如果成功获取则返回聊天流否则返回None
"""
chat_stream = None
# 1. 尝试从对话实例获取
if conversation and hasattr(conversation, "should_continue"):
# 等待一小段时间,确保初始化完成
retry_count = 0
max_retries = 10
while not conversation.should_continue and retry_count < max_retries:
await asyncio.sleep(0.5)
retry_count += 1
logger.debug(f"[私聊][{self.private_name}]等待新对话实例初始化完成: 尝试 {retry_count}/{max_retries}")
if not conversation.should_continue:
logger.warning(f"[私聊][{self.private_name}]新对话实例初始化可能未完成,但仍将尝试获取聊天流")
# 尝试使用对话实例的聊天流
if hasattr(conversation, "chat_stream") and conversation.chat_stream:
logger.info(f"[私聊][{self.private_name}]使用新对话实例的聊天流")
return conversation.chat_stream
# 2. 尝试从聊天管理器获取
try:
logger.info(f"[私聊][{self.private_name}]尝试从chat_manager获取聊天流")
chat_stream = chat_manager.get_stream(self.stream_id)
if chat_stream:
return chat_stream
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]从chat_manager获取聊天流失败: {str(e)}")
logger.warning(traceback.format_exc())
# 3. 创建新的聊天流
try:
logger.warning(f"[私聊][{self.private_name}]无法获取现有聊天流,创建新的聊天流")
# 创建用户信息对象
user_info = UserInfo(user_id=global_config.BOT_QQ, user_nickname=global_config.BOT_NICKNAME, platform="qq")
# 创建聊天流
return ChatStream(self.stream_id, "qq", user_info)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]创建新聊天流失败: {str(e)}")
logger.error(traceback.format_exc())
return None

View File

@ -3,13 +3,11 @@ import traceback
from typing import Tuple, Optional, Dict, Any, List
from src.common.logger_manager import get_logger
# from src.individuality.individuality import Individuality
from src.plugins.utils.chat_message_builder import build_readable_messages
from ..models.utils_model import LLMRequest
from src.config.config import global_config
# 确保导入路径正确
from .pfc_utils import get_items_from_json
from .pfc_utils import get_items_from_json, build_chat_history_text
from .chat_observer import ChatObserver
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
@ -22,10 +20,10 @@ logger = get_logger("pfc_action_planner")
# Prompt(1): 首次回复或非连续回复时的决策 Prompt
PROMPT_INITIAL_REPLY = """
当前时间{current_time_str}
现在{persona_text}正在与{sender_name}在qq上私聊
现在[{persona_text}]正在与[{sender_name}]在qq上私聊
他们的关系是{relationship_text}
{persona_text}现在的心情{current_emotion_text}
你现在需要操控{persona_text}根据以下所有信息灵活合理的决策{persona_text}的下一步行动需要符合正常人的社交流程可以回复可以倾听甚至可以屏蔽对方
[{persona_text}]现在的心情{current_emotion_text}
你现在需要操控[{persona_text}]判断当前氛围和双方的意图根据以下所有信息灵活合理的决策{persona_text}的下一步行动需要符合正常人的社交流程可以回复可以倾听甚至可以屏蔽对方
当前对话目标
{goals_str}
@ -38,15 +36,14 @@ PROMPT_INITIAL_REPLY = """
最近的对话记录(包括你已成功发送的消息 新收到的消息)
{chat_history_text}
{spam_warning_info}
------
可选行动类型以及解释
listening: 倾听对方发言当你认为对方话才说到一半发言明显未结束时选择
direct_reply: 直接回复对方 (当有新消息需要处理时通常应选择此项)
direct_reply: 直接回复对方
rethink_goal: 思考一个对话目标当你觉得目前对话需要目标或当前目标不再适用或话题卡住时选择注意私聊的环境是灵活的有可能需要经常选择
end_conversation: 结束对话对方长时间没回复繁忙或者当你觉得对话告一段落时可以选择
block_and_ignore: 更加极端的结束对话方式直接结束对话并在一段时间内无视对方所有发言屏蔽对话让你感到十分不适或你遭到各类骚扰时选择
block_and_ignore: 更加极端的结束对话方式直接结束对话并在一段时间内无视对方所有发言屏蔽你觉得对话让[{persona_text}]感到十分不适[{persona_text}]遭到各类骚扰时选择
请以JSON格式输出你的决策
{{
@ -59,10 +56,10 @@ block_and_ignore: 更加极端的结束对话方式,直接结束对话并在
# Prompt(2): 上一次成功回复后,决定继续发言时的决策 Prompt
PROMPT_FOLLOW_UP = """
当前时间{current_time_str}
现在{persona_text}正在与{sender_name}在qq上私聊**并且刚刚{persona_text}已经回复了对方**
现在[{persona_text}]正在与[{sender_name}]在qq上私聊**并且刚刚[{persona_text}]已经回复了对方**
他们的关系是{relationship_text}
{persona_text}现在的心情是{current_emotion_text}
你现在需要操控{persona_text}根据以下所有信息灵活合理的决策{persona_text}的下一步行动需要符合正常人的社交流程可以发送新消息可以等待可以倾听可以结束对话甚至可以屏蔽对方
{persona_text}现在的心情是{current_emotion_text}
你现在需要操控[{persona_text}]判断当前氛围和双方的意图根据以下所有信息灵活合理的决策[{persona_text}]的下一步行动需要符合正常人的社交流程可以发送新消息可以等待可以倾听可以结束对话甚至可以屏蔽对方
当前对话目标
{goals_str}
@ -75,16 +72,14 @@ PROMPT_FOLLOW_UP = """
最近的对话记录(包括你已成功发送的消息 新收到的消息)
{chat_history_text}
{spam_warning_info}
------
可选行动类型以及解释
wait: 暂时不说话留给对方交互空间等待对方回复
listening: 倾听对方发言虽然你刚发过言但如果对方立刻回复且明显话没说完可以选择这个
send_new_message: 发送一条新消息继续对话允许适当的追问补充深入话题或开启相关新话题但是注意看对话记录如果对方已经没有回复你end_conversation或wait可能更合适
send_new_message: 发送一条新消息当你觉得[{persona_text}]还有话要说或现在适合/需要发送消息时可以选择
rethink_goal: 思考一个对话目标当你觉得目前对话需要目标或当前目标不再适用或话题卡住时选择注意私聊的环境是灵活的有可能需要经常选择
end_conversation: 安全和平的结束对话对方长时间没回复繁忙已经不再回复你消息明显暗示或表达想结束聊天时可以果断选择
block_and_ignore: 更加极端的结束对话方式直接结束对话并在一段时间内无视对方所有发言屏蔽对话让你感到十分不适或你遭到各类骚扰时选择
end_conversation: 安全和平的结束对话对方长时间没回复繁忙或你觉得对话告一段落时可以选择
block_and_ignore: 更加极端的结束对话方式直接结束对话并在一段时间内无视对方所有发言屏蔽你觉得对话让[{persona_text}]感到十分不适[{persona_text}]遭到各类骚扰时选择
请以JSON格式输出你的决策
{{
@ -136,7 +131,6 @@ PROMPT_REFLECT_AND_ACT = """
最近的对话记录(包括你已成功发送的消息 新收到的消息)
{chat_history_text}
{spam_warning_info}
------
可选行动类型以及解释
@ -154,6 +148,7 @@ block_and_ignore: 更加极端的结束对话方式,直接结束对话并在
注意请严格按照JSON格式输出不要包含任何其他内容"""
class ActionPlanner:
"""行动规划器"""
@ -212,16 +207,16 @@ class ActionPlanner:
time_since_last_bot_message_info = self._get_bot_last_speak_time_info(observation_info)
timeout_context = self._get_timeout_context(conversation_info)
goals_str = self._build_goals_string(conversation_info)
chat_history_text = await self._build_chat_history_text(observation_info)
chat_history_text = await build_chat_history_text(observation_info, self.private_name)
# 获取 sender_name, relationship_text, current_emotion_text
sender_name_str = getattr(observation_info, 'sender_name', '对方') # 从 observation_info 获取
if not sender_name_str: sender_name_str = '对方' # 再次确保有默认值
sender_name_str = self.private_name
if not sender_name_str:
sender_name_str = "对方" # 再次确保有默认值
relationship_text_str = getattr(conversation_info, 'relationship_text', '你们还不熟悉。')
current_emotion_text_str = getattr(conversation_info, 'current_emotion_text', '心情平静。')
relationship_text_str = getattr(conversation_info, "relationship_text", "你们还不熟悉。")
current_emotion_text_str = getattr(conversation_info, "current_emotion_text", "心情平静。")
persona_text = f"{self.name}"
persona_text = f"{self.name}"
action_history_summary, last_action_context = self._build_action_history_context(conversation_info)
# retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info(
# chat_history_text, self.private_name
@ -236,39 +231,41 @@ class ActionPlanner:
# --- 2. 选择并格式化 Prompt ---
try:
if use_reflect_prompt: # 新增的判断
if use_reflect_prompt: # 新增的判断
prompt_template = PROMPT_REFLECT_AND_ACT
log_msg = "使用 PROMPT_REFLECT_AND_ACT (反思决策)"
# 对于 PROMPT_REFLECT_AND_ACT它不包含 send_new_message 选项,所以 spam_warning_message 中的相关提示可以调整或省略
# 但为了保持占位符填充的一致性,我们仍然计算它
spam_warning_message = ""
if conversation_info.my_message_count > 5: # 这里的 my_message_count 仍有意义,表示之前连续发送了多少
spam_warning_message = f"⚠️【警告】**你之前已连续发送{str(conversation_info.my_message_count)}条消息!请谨慎决策。**"
elif conversation_info.my_message_count > 2:
spam_warning_message = f"💬【提示】**你之前已连续发送{str(conversation_info.my_message_count)}条消息。请注意保持对话平衡。**"
# spam_warning_message = ""
# if conversation_info.my_message_count > 5: # 这里的 my_message_count 仍有意义,表示之前连续发送了多少
# spam_warning_message = (
# f"⚠️【警告】**你之前已连续发送{str(conversation_info.my_message_count)}条消息!请谨慎决策。**"
# )
# elif conversation_info.my_message_count > 2:
# spam_warning_message = f"💬【提示】**你之前已连续发送{str(conversation_info.my_message_count)}条消息。请注意保持对话平衡。**"
elif last_successful_reply_action in ["direct_reply", "send_new_message"]:
prompt_template = PROMPT_FOLLOW_UP
log_msg = "使用 PROMPT_FOLLOW_UP (追问决策)"
spam_warning_message = ""
if conversation_info.my_message_count > 5:
spam_warning_message = f"⚠️【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息请注意不要再选择send_new_message以免刷屏对造成对方困扰**"
elif conversation_info.my_message_count > 2:
spam_warning_message = f"💬【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息。请保持理智如果非必要请避免选择send_new_message以免给对方造成困扰。**"
# spam_warning_message = ""
# if conversation_info.my_message_count > 5:
# spam_warning_message = f"⚠️【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息请注意不要再选择send_new_message以免刷屏对造成对方困扰**"
# elif conversation_info.my_message_count > 2:
# spam_warning_message = f"💬【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息。请保持理智如果非必要请避免选择send_new_message以免给对方造成困扰。**"
else:
prompt_template = PROMPT_INITIAL_REPLY
log_msg = "使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)"
spam_warning_message = "" # 初始回复时通常不需要刷屏警告
# spam_warning_message = "" # 初始回复时通常不需要刷屏警告
logger.debug(f"[私聊][{self.private_name}] {log_msg}")
current_time_value = "获取时间失败"
if observation_info and hasattr(observation_info, 'current_time_str') and observation_info.current_time_str:
if observation_info and hasattr(observation_info, "current_time_str") and observation_info.current_time_str:
current_time_value = observation_info.current_time_str
if spam_warning_message:
spam_warning_message = f"\n{spam_warning_message}\n"
# if spam_warning_message:
# spam_warning_message = f"\n{spam_warning_message}\n"
prompt = prompt_template.format(
persona_text=persona_text,
@ -281,10 +278,10 @@ class ActionPlanner:
# retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。",
# retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。",
current_time_str=current_time_value,
spam_warning_info=spam_warning_message,
# spam_warning_info=spam_warning_message,
sender_name=sender_name_str,
relationship_text=relationship_text_str,
current_emotion_text=current_emotion_text_str
current_emotion_text=current_emotion_text_str,
)
logger.debug(f"[私聊][{self.private_name}] 发送到LLM的最终提示词:\n------\n{prompt}\n------")
except KeyError as fmt_key_err:
@ -332,10 +329,11 @@ class ActionPlanner:
time_str_for_end_decision = observation_info.current_time_str
final_action, final_reason = await self._handle_end_conversation_decision(
persona_text,
chat_history_text, initial_reason,
time_str_for_end_decision,
chat_history_text,
initial_reason,
time_str_for_end_decision,
sender_name_str=sender_name_str,
relationship_text_str=relationship_text_str
relationship_text_str=relationship_text_str,
)
except Exception as end_dec_err:
logger.error(f"[私聊][{self.private_name}] 处理结束对话决策时出错: {end_dec_err}")
@ -360,7 +358,7 @@ class ActionPlanner:
"block_and_ignore",
"say_goodbye",
]
valid_actions_reflect = [ # PROMPT_REFLECT_AND_ACT 的动作
valid_actions_reflect = [ # PROMPT_REFLECT_AND_ACT 的动作
"wait",
"listening",
"rethink_goal",
@ -466,52 +464,6 @@ class ActionPlanner:
goals_str = "- 构建对话目标时出错。\n"
return goals_str
async def _build_chat_history_text(self, observation_info: ObservationInfo) -> str:
"""构建聊天历史记录文本 (包含未处理消息)"""
chat_history_text = ""
try:
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str:
chat_history_text = observation_info.chat_history_str
elif hasattr(observation_info, "chat_history") and observation_info.chat_history:
history_slice = observation_info.chat_history[-20:]
chat_history_text = await build_readable_messages(
history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0
)
else:
chat_history_text = "还没有聊天记录。\n"
unread_count = getattr(observation_info, "new_messages_count", 0)
unread_messages = getattr(observation_info, "unprocessed_messages", [])
if unread_count > 0 and unread_messages:
bot_qq_str = str(global_config.BOT_QQ)
other_unread_messages = [
msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str
]
other_unread_count = len(other_unread_messages)
if other_unread_count > 0:
new_messages_str = await build_readable_messages(
other_unread_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += (
f"\n--- 以下是 {other_unread_count} 条你需要处理的新消息 ---\n{new_messages_str}\n------\n"
)
logger.debug(f"[私聊][{self.private_name}] 向 LLM 追加了 {other_unread_count} 条未读消息。")
else:
chat_history_text += (
f"\n--- 以上均为已读消息,未读消息均已处理完毕 ---\n"
)
except AttributeError as e:
logger.warning(f"[私聊][{self.private_name}] 构建聊天记录文本时属性错误: {e}")
chat_history_text = "[获取聊天记录时出错]\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 处理聊天记录时发生未知错误: {e}")
chat_history_text = "[处理聊天记录时出错]\n"
return chat_history_text
def _build_action_history_context(self, conversation_info: ConversationInfo) -> Tuple[str, str]:
"""构建行动历史概要和上一次行动详细情况"""
@ -561,11 +513,23 @@ class ActionPlanner:
# --- Helper method for handling end_conversation decision ---
async def _handle_end_conversation_decision(
self, persona_text: str, chat_history_text: str, initial_reason: str, current_time_str: str, sender_name_str: str, relationship_text_str: str
self,
persona_text: str,
chat_history_text: str,
initial_reason: str,
current_time_str: str,
sender_name_str: str,
relationship_text_str: str,
) -> Tuple[str, str]:
"""处理结束对话前的告别决策"""
logger.info(f"[私聊][{self.private_name}] 初步规划结束对话,进入告别决策...")
end_decision_prompt = PROMPT_END_DECISION.format(persona_text=persona_text, chat_history_text=chat_history_text,current_time_str=current_time_str,sender_name = sender_name_str, relationship_text = relationship_text_str)
end_decision_prompt = PROMPT_END_DECISION.format(
persona_text=persona_text,
chat_history_text=chat_history_text,
current_time_str=current_time_str,
sender_name=sender_name_str,
relationship_text=relationship_text_str,
)
logger.debug(f"[私聊][{self.private_name}] 发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------")
llm_start_time = time.time()
end_content, _ = await self.llm.generate_response_async(end_decision_prompt)

View File

@ -295,7 +295,9 @@ async def handle_action(
# 后续的 plan 循环会检测到这个 "done_no_reply" 状态并使用反思 prompt
elif is_suitable: # 适用于 direct_reply 或 (send_new_message 且 RG决定发送并通过检查)
logger.debug(f"[私聊][{conversation_instance.private_name}] 动作 '{action}': 找到合适的回复,准备发送。")
logger.debug(
f"[私聊][{conversation_instance.private_name}] 动作 '{action}': 找到合适的回复,准备发送。"
)
# conversation_info.last_reply_rejection_reason = None # 已在循环内清除
# conversation_info.last_rejected_reply_content = None
conversation_instance.generated_reply = generated_content_for_check_or_send # 使用检查通过的内容
@ -361,7 +363,7 @@ async def handle_action(
observation_info.chat_history_str = "[构建聊天记录出错]"
# --- 新增结束 ---
# 更新 idle_conversation_starter 的最后消息时间
# 更新 idle_chat 的最后消息时间
# (避免在发送消息后很快触发主动聊天)
if conversation_instance.idle_chat:
await conversation_instance.idle_chat.update_last_message_time(send_end_time)
@ -506,7 +508,7 @@ async def handle_action(
action_successful = True # 标记成功
# final_status 和 final_reason 会在 finally 中设置
logger.info(f"[私聊][{conversation_instance.private_name}] 成功发送告别语,即将停止对话实例。")
# 更新 idle_conversation_starter 的最后消息时间
# 更新 idle_chat 的最后消息时间
# (避免在发送消息后很快触发主动聊天)
if conversation_instance.idle_chat:
await conversation_instance.idle_chat.update_last_message_time(send_end_time)

View File

@ -25,12 +25,10 @@ from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .reply_generator import ReplyGenerator
from .PFC_idle.idle_chat import IdleChat
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .waiter import Waiter
from .reply_checker import ReplyChecker
# >>> 新增导入 <<<
from .conversation_loop import run_conversation_loop # 导入新的循环函数
from .conversation_loop import run_conversation_loop
from rich.traceback import install
@ -38,13 +36,6 @@ install(extra_lines=3)
logger = get_logger("pfc_conversation")
# 时区配置移到 loop 文件或更全局的位置,这里不再需要
# configured_tz = getattr(global_config, 'TIME_ZONE', 'Asia/Shanghai')
# TIME_ZONE = tz.gettz(configured_tz)
# if TIME_ZONE is None:
# logger.error(f"配置的时区 '{configured_tz}' 无效,将使用默认时区 'Asia/Shanghai'")
# TIME_ZONE = tz.gettz('Asia/Shanghai')
class Conversation:
"""
@ -59,7 +50,7 @@ class Conversation:
self.stream_id: str = stream_id
self.private_name: str = private_name
self.state: ConversationState = ConversationState.INIT
self.should_continue: bool = False # Manager 会在初始化后设置
self.should_continue: bool = False
self.ignore_until_timestamp: Optional[float] = None
self.generated_reply: str = ""
self.chat_stream: Optional[ChatStream] = None
@ -74,7 +65,6 @@ class Conversation:
self.action_planner: Optional[ActionPlanner] = None
self.goal_analyzer: Optional[GoalAnalyzer] = None
self.reply_generator: Optional[ReplyGenerator] = None
self.knowledge_fetcher: Optional[KnowledgeFetcher] = None
self.waiter: Optional[Waiter] = None
self.direct_sender: Optional[DirectMessageSender] = None
self.idle_chat: Optional[IdleChat] = None
@ -83,13 +73,14 @@ class Conversation:
self.conversation_info: Optional[ConversationInfo] = None
self.reply_checker: Optional[ReplyChecker] = None
self._initialized: bool = False # Manager 会在初始化成功后设为 True
self._initialized: bool = False
self.bot_qq_str: Optional[str] = str(global_config.BOT_QQ) if global_config.BOT_QQ else None
if not self.bot_qq_str:
logger.error(f"[私聊][{self.private_name}] 严重错误:未能从配置中获取 BOT_QQ ID")
# _initialize 和 _load_initial_history 方法已被移除
# 确保这个属性被正确初始化
self.consecutive_llm_action_failures: int = 0 # LLM相关动作连续失败的计数器
async def start(self):
"""
@ -105,27 +96,27 @@ class Conversation:
logger.info(f"[私聊][{self.private_name}] 对话系统启动,准备创建规划循环任务...")
try:
# >>> 修改后的调用 <<<
# 创建PFC主循环任务
_loop_task = asyncio.create_task(run_conversation_loop(self))
logger.info(f"[私聊][{self.private_name}] 规划循环任务已创建。")
except Exception as task_err:
logger.error(f"[私聊][{self.private_name}] 创建规划循环任务时出错: {task_err}")
await self.stop()
await self.stop() # 发生错误时尝试停止
async def stop(self):
"""
停止对话实例并清理相关资源
"""
logger.info(f"[私聊][{self.private_name}] 正在停止对话实例: {self.stream_id}")
self.should_continue = False
self.should_continue = False # 设置标志以退出循环
# 最终关系评估
if (
self._initialized
self._initialized # 确保已初始化
and self.relationship_updater
and self.conversation_info
and self.observation_info
and self.chat_observer
and self.chat_observer # 确保所有需要的组件都存在
):
try:
logger.info(f"[私聊][{self.private_name}] 准备执行最终关系评估...")
@ -143,11 +134,10 @@ class Conversation:
# 停止其他组件
if self.idle_chat:
# 减少活跃实例计数而不是停止IdleChat
await self.idle_chat.decrement_active_instances()
await self.idle_chat.decrement_active_instances() # 减少活跃实例计数
logger.debug(f"[私聊][{self.private_name}] 已减少IdleChat活跃实例计数")
if self.observation_info and self.chat_observer:
self.observation_info.unbind_from_chat_observer()
if self.observation_info and self.chat_observer: # 确保二者都存在
self.observation_info.unbind_from_chat_observer() # 解绑
if self.mood_mng and hasattr(self.mood_mng, "stop_mood_update") and self.mood_mng._running: # type: ignore
self.mood_mng.stop_mood_update() # type: ignore
logger.debug(f"[私聊][{self.private_name}] MoodManager 后台更新已停止。")
@ -155,12 +145,11 @@ class Conversation:
self._initialized = False # 标记为未初始化
logger.info(f"[私聊][{self.private_name}] 对话实例 {self.stream_id} 已停止。")
# _plan_and_action_loop 方法已被移除
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Optional[Message]:
"""将从数据库或其他来源获取的消息字典转换为内部使用的 Message 对象"""
# 这个方法似乎没有被其他内部方法调用,但为了完整性暂时保留
try:
# 尝试获取与此对话实例关联的 ChatStream
chat_stream_to_use = self.chat_stream or chat_manager.get_stream(self.stream_id)
if not chat_stream_to_use:
logger.error(
@ -168,6 +157,7 @@ class Conversation:
)
return None
# 解析 UserInfo
user_info_dict = msg_dict.get("user_info", {})
user_info: Optional[UserInfo] = None
if isinstance(user_info_dict, dict):
@ -177,21 +167,22 @@ class Conversation:
logger.warning(
f"[私聊][{self.private_name}] 从字典创建 UserInfo 时出错: {e}, dict: {user_info_dict}"
)
if not user_info:
if not user_info: # 如果没有有效的 UserInfo则无法创建 Message 对象
logger.warning(
f"[私聊][{self.private_name}] 消息缺少有效的 UserInfo无法转换。 msg_id: {msg_dict.get('message_id')}"
)
return None
# 创建并返回 Message 对象
return Message(
message_id=msg_dict.get("message_id", f"gen_{time.time()}"),
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 message_id
chat_stream=chat_stream_to_use,
time=msg_dict.get("time", time.time()),
time=msg_dict.get("time", time.time()), # 提供默认时间
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
processed_plain_text=msg_dict.get("processed_plain_text", ""), # 提供默认文本
detailed_plain_text=msg_dict.get("detailed_plain_text", ""), # 提供默认详细文本
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}] 转换消息时出错: {e}")
logger.error(f"[私聊][{self.private_name}] {traceback.format_exc()}")
return None
return None # 出错时返回 None

View File

@ -17,4 +17,5 @@ class ConversationInfo:
self.relationship_text: Optional[str] = "你们还不熟悉。" # 与当前对话者的关系描述文本
self.current_emotion_text: Optional[str] = "心情平静。" # 机器人当前的情绪描述文本
self.current_instance_message_count: int = 0 # 当前私聊实例中的消息计数
self.other_new_messages_during_planning_count: int = 0 # 在计划阶段期间收到的其他新消息计数
# --- 新增字段结束 ---

View File

@ -18,7 +18,6 @@ from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .reply_generator import ReplyGenerator
from .PFC_idle.idle_chat import IdleChat
from .pfc_KnowledgeFetcher import KnowledgeFetcher # 修正大小写
from .waiter import Waiter
from .pfc_utils import get_person_id
from .reply_checker import ReplyChecker
@ -166,9 +165,6 @@ async def initialize_core_components(conversation_instance: "Conversation"):
conversation_instance.stream_id, conversation_instance.private_name
)
logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 初始化 KnowledgeFetcher...")
conversation_instance.knowledge_fetcher = KnowledgeFetcher(conversation_instance.private_name)
logger.debug(f"[私聊][{conversation_instance.private_name}] (Initializer) 初始化 Waiter...")
conversation_instance.waiter = Waiter(conversation_instance.stream_id, conversation_instance.private_name)

View File

@ -15,18 +15,19 @@ if TYPE_CHECKING:
logger = get_logger("pfc_loop")
# 时区配置 (从 conversation.py 移过来,或者考虑放到更全局的配置模块)
# 时区配置
configured_tz = getattr(global_config, "TIME_ZONE", "Asia/Shanghai")
TIME_ZONE = tz.gettz(configured_tz)
if TIME_ZONE is None:
logger.error(f"配置的时区 '{configured_tz}' 无效,将使用默认时区 'Asia/Shanghai'")
TIME_ZONE = tz.gettz("Asia/Shanghai")
MAX_CONSECUTIVE_LLM_ACTION_FAILURES = 3 # 可配置的最大LLM连续失败次数
async def run_conversation_loop(conversation_instance: "Conversation"):
"""
核心的规划与行动循环 (PFC Loop)
之前是 Conversation 类中的 _plan_and_action_loop 方法
"""
logger.debug(f"[私聊][{conversation_instance.private_name}] 进入 run_conversation_loop 循环。")
@ -34,16 +35,20 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
logger.error(f"[私聊][{conversation_instance.private_name}] 尝试在未初始化状态下运行规划循环,退出。")
return
force_reflect_and_act = False # 用于强制使用反思 prompt 的标志
_force_reflect_and_act_next_iter = False
while conversation_instance.should_continue:
loop_iter_start_time = time.time()
logger.debug(f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f})")
current_force_reflect_and_act = _force_reflect_and_act_next_iter
_force_reflect_and_act_next_iter = False
logger.debug(
f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f}), force_reflect_next_iter: {current_force_reflect_and_act}, consecutive_llm_failures: {conversation_instance.consecutive_llm_action_failures}"
)
# 更新当前时间
try:
global TIME_ZONE # 引用全局 TIME_ZONE
if TIME_ZONE is None: # 如果还未加载成功
global TIME_ZONE
if TIME_ZONE is None:
configured_tz_loop = getattr(global_config, "TIME_ZONE", "Asia/Shanghai")
TIME_ZONE = tz.gettz(configured_tz_loop)
if TIME_ZONE is None:
@ -54,7 +59,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
if conversation_instance.observation_info:
time_str = current_time_dt.strftime("%Y-%m-%d %H:%M:%S %Z%z")
conversation_instance.observation_info.current_time_str = time_str
logger.debug(f"[私聊][{conversation_instance.private_name}] 更新 ObservationInfo 当前时间: {time_str}")
else:
logger.warning(
f"[私聊][{conversation_instance.private_name}] ObservationInfo 未初始化,无法更新当前时间。"
@ -64,15 +68,11 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
f"[私聊][{conversation_instance.private_name}] 更新 ObservationInfo 当前时间时出错: {time_update_err}"
)
# 处理忽略状态
if (
conversation_instance.ignore_until_timestamp
and loop_iter_start_time < conversation_instance.ignore_until_timestamp
):
if conversation_instance.idle_chat and conversation_instance.idle_chat._running:
# 不直接停止服务,改为暂时忽略此用户
# 虽然我们仍然可以通过active_instances_count来决定是否触发主动聊天
# 但为了安全起见,我们只记录一个日志
logger.debug(f"[私聊][{conversation_instance.private_name}] 对话被暂时忽略,暂停对该用户的主动聊天")
sleep_duration = min(30, conversation_instance.ignore_until_timestamp - loop_iter_start_time)
await asyncio.sleep(sleep_duration)
@ -85,18 +85,13 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
f"[私聊][{conversation_instance.private_name}] 忽略时间已到 {conversation_instance.stream_id},准备结束对话。"
)
conversation_instance.ignore_until_timestamp = None
await conversation_instance.stop() # 调用 Conversation 实例的 stop 方法
await conversation_instance.stop()
continue
else:
# 忽略状态结束,这里不需要任何特殊处理
# IdleChat会通过active_instances_count自动决定是否触发
pass
# 核心规划与行动逻辑
try:
# 更新关系和情绪文本 (在每次循环开始时进行)
if conversation_instance.conversation_info and conversation_instance._initialized:
# 更新关系
if (
conversation_instance.conversation_info.person_id
and conversation_instance.relationship_translator
@ -121,13 +116,11 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
except Exception as e_rel:
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) 更新关系文本时出错: {e_rel}")
conversation_instance.conversation_info.relationship_text = "你们的关系是:普通。"
# 更新情绪
if conversation_instance.mood_mng:
conversation_instance.conversation_info.current_emotion_text = (
conversation_instance.mood_mng.get_prompt()
) # type: ignore
)
# 检查核心组件
if not all(
[
conversation_instance.action_planner,
@ -141,7 +134,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
await asyncio.sleep(5)
continue
# 规划
planning_start_time = time.time()
logger.debug(
f"[私聊][{conversation_instance.private_name}] --- (Loop) 开始规划 ({planning_start_time:.2f}) ---"
@ -155,79 +147,55 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
conversation_instance.conversation_info.last_successful_reply_action
if conversation_instance.conversation_info
else None,
use_reflect_prompt=force_reflect_and_act,
use_reflect_prompt=current_force_reflect_and_act,
)
force_reflect_and_act = False
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) ActionPlanner.plan 完成,初步规划动作: {action}"
)
# 检查中断
current_unprocessed_messages = getattr(conversation_instance.observation_info, "unprocessed_messages", [])
new_messages_during_planning: List[Dict[str, Any]] = []
other_new_messages_during_planning: List[Dict[str, Any]] = []
current_unprocessed_messages_after_plan = getattr(
conversation_instance.observation_info, "unprocessed_messages", []
)
new_messages_during_action_planning: List[Dict[str, Any]] = []
other_new_messages_during_action_planning: List[Dict[str, Any]] = []
for msg in current_unprocessed_messages:
msg_time = msg.get("time")
sender_id_info = msg.get("user_info", {})
sender_id = str(sender_id_info.get("user_id")) if sender_id_info else None
if msg_time and msg_time >= planning_start_time:
new_messages_during_planning.append(msg)
if sender_id != conversation_instance.bot_qq_str:
other_new_messages_during_planning.append(msg)
for msg_ap in current_unprocessed_messages_after_plan:
msg_time_ap = msg_ap.get("time")
sender_id_info_ap = msg_ap.get("user_info", {})
sender_id_ap = str(sender_id_info_ap.get("user_id")) if sender_id_info_ap else None
if msg_time_ap and msg_time_ap >= planning_start_time:
new_messages_during_action_planning.append(msg_ap)
if sender_id_ap != conversation_instance.bot_qq_str:
other_new_messages_during_action_planning.append(msg_ap)
new_msg_count = len(new_messages_during_planning)
other_new_msg_count = len(other_new_messages_during_planning)
new_msg_count_action_planning = len(new_messages_during_action_planning)
other_new_msg_count_action_planning = len(other_new_messages_during_action_planning)
if conversation_instance.conversation_info and other_new_msg_count > 0:
conversation_instance.conversation_info.current_instance_message_count += other_new_msg_count
# 触发关系和情绪更新(如果需要)
if (
conversation_instance.relationship_updater
and conversation_instance.observation_info
and conversation_instance.chat_observer
):
await conversation_instance.relationship_updater.update_relationship_incremental(
conversation_info=conversation_instance.conversation_info,
observation_info=conversation_instance.observation_info,
chat_observer_for_history=conversation_instance.chat_observer,
)
if (
conversation_instance.emotion_updater
and other_new_messages_during_planning
and conversation_instance.observation_info
and conversation_instance.chat_observer
):
last_user_msg = other_new_messages_during_planning[-1]
last_user_msg_text = last_user_msg.get("processed_plain_text", "用户发了新消息")
sender_name_for_event = getattr(conversation_instance.observation_info, "sender_name", "对方")
event_desc = f"用户【{sender_name_for_event}】发送了新消息: '{last_user_msg_text[:30]}...'"
await conversation_instance.emotion_updater.update_emotion_based_on_context(
conversation_info=conversation_instance.conversation_info,
observation_info=conversation_instance.observation_info,
chat_observer_for_history=conversation_instance.chat_observer,
event_description=event_desc,
)
if conversation_instance.conversation_info and other_new_msg_count_action_planning > 0:
pass
should_interrupt: bool = False
interrupt_reason: str = ""
if action in ["wait", "listening"] and new_msg_count > 0:
should_interrupt = True
interrupt_reason = f"规划 {action} 期间收到 {new_msg_count} 条新消息"
elif other_new_msg_count > 2: # Threshold for other actions
should_interrupt = True
interrupt_reason = f"规划 {action} 期间收到 {other_new_msg_count} 条来自他人的新消息"
if should_interrupt:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) 中断 '{action}',原因: {interrupt_reason}。重新规划..."
should_interrupt_action_planning: bool = False
interrupt_reason_action_planning: str = ""
if action in ["wait", "listening"] and new_msg_count_action_planning > 0:
should_interrupt_action_planning = True
interrupt_reason_action_planning = f"规划 {action} 期间收到 {new_msg_count_action_planning} 条新消息"
elif other_new_msg_count_action_planning > 2:
should_interrupt_action_planning = True
interrupt_reason_action_planning = (
f"规划 {action} 期间收到 {other_new_msg_count_action_planning} 条来自他人的新消息"
)
cancel_record = {
if should_interrupt_action_planning:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) 中断 '{action}' (在ActionPlanner.plan后),原因: {interrupt_reason_action_planning}。重新规划..."
)
cancel_record_ap = {
"action": action,
"plan_reason": reason,
"status": "cancelled_due_to_new_messages",
"status": "cancelled_due_to_new_messages_during_action_plan",
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"final_reason": interrupt_reason,
"final_reason": interrupt_reason_action_planning,
}
if conversation_instance.conversation_info:
if (
@ -235,40 +203,208 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
or conversation_instance.conversation_info.done_action is None
):
conversation_instance.conversation_info.done_action = []
conversation_instance.conversation_info.done_action.append(cancel_record)
conversation_instance.conversation_info.done_action.append(cancel_record_ap)
conversation_instance.conversation_info.last_successful_reply_action = None
conversation_instance.state = ConversationState.ANALYZING
await asyncio.sleep(0.1)
continue
# 执行动作 (调用 actions 模块的函数)
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) 未中断,调用 actions.handle_action 执行动作 '{action}'..."
)
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = other_new_msg_count
# --- LLM Action Handling with Shield and Failure Count ---
if action in ["direct_reply", "send_new_message"]:
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) 动作 '{action}' 需要LLM生成进入监控执行模式..."
)
llm_call_start_time = time.time()
await actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) actions.handle_action 完成。")
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = (
other_new_msg_count_action_planning
)
llm_action_task = asyncio.create_task(
actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
)
interrupted_by_new_messages = False
llm_action_completed_successfully = False
action_outcome_processed = False # Flag to ensure we process outcome only once
while not llm_action_task.done() and not action_outcome_processed:
try:
# Shield the task so wait_for timeout doesn't cancel it directly
await asyncio.wait_for(asyncio.shield(llm_action_task), timeout=1.5)
# If wait_for completes without timeout, the shielded task is done (or errored/cancelled by itself)
action_outcome_processed = True # Outcome will be processed outside this loop
except asyncio.TimeoutError:
# Shielded task didn't finish in 1.5s. This is our chance to check messages.
current_time_for_check = time.time()
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM Monitor polling. llm_call_start_time: {llm_call_start_time:.2f}, current_check_time: {current_time_for_check:.2f}. Task still running, checking for new messages."
)
current_unprocessed_messages_during_llm = getattr(
conversation_instance.observation_info, "unprocessed_messages", []
)
other_new_messages_this_check: List[Dict[str, Any]] = []
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) Checking unprocessed_messages (count: {len(current_unprocessed_messages_during_llm)}):"
)
for msg_llm in current_unprocessed_messages_during_llm:
msg_time_llm = msg_llm.get("time")
sender_id_info_llm = msg_llm.get("user_info", {})
sender_id_llm = str(sender_id_info_llm.get("user_id")) if sender_id_info_llm else None
is_new_enough = msg_time_llm and msg_time_llm >= llm_call_start_time
is_other_sender = sender_id_llm != conversation_instance.bot_qq_str
time_str_for_log = f"{msg_time_llm:.2f}" if msg_time_llm is not None else "N/A"
logger.debug(
f" - Msg ID: {msg_llm.get('message_id')}, Time: {time_str_for_log}, Sender: {sender_id_llm}. New enough? {is_new_enough}. Other sender? {is_other_sender}."
)
if is_new_enough and is_other_sender:
other_new_messages_this_check.append(msg_llm)
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) Found {len(other_new_messages_this_check)} 'other_new_messages_this_check'."
)
if len(other_new_messages_this_check) > global_config.pfc_message_buffer_size:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息将取消LLM任务。"
)
if not llm_action_task.done(): # Check again before cancelling
llm_action_task.cancel() # Now we explicitly cancel the original task
interrupted_by_new_messages = True
action_outcome_processed = True # We've made a decision, exit monitoring
# else: continue polling if not enough new messages
# Shield ensures CancelledError from llm_action_task itself is caught by the outer try/except
# After the monitoring loop (either task finished, or we decided to cancel it)
# Await the task properly to get its result or handle its exception/cancellation
action_final_status_in_history = "unknown"
try:
await llm_action_task # This will re-raise CancelledError if we cancelled it, or other exceptions
# If no exception, it means the task completed.
# actions.handle_action updates done_action, so we check its status.
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
# Check if done_action is not empty
if conversation_instance.conversation_info.done_action:
action_final_status_in_history = conversation_instance.conversation_info.done_action[
-1
].get("status", "unknown")
if action_final_status_in_history in ["done", "done_no_reply"]:
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终成功完成 (status: {action_final_status_in_history})。"
)
conversation_instance.consecutive_llm_action_failures = 0
llm_action_completed_successfully = True
else:
logger.warning(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务完成但未成功 (status: {action_final_status_in_history})。"
)
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
except asyncio.CancelledError:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终确认被取消。"
)
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
logger.warning(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因外部原因取消连续失败次数: {conversation_instance.consecutive_llm_action_failures}"
)
else: # interrupted_by_new_messages is True
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因新消息被内部逻辑取消不计为LLM失败。"
)
except Exception as e_llm_final:
logger.error(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务执行时发生最终错误: {e_llm_final}"
)
logger.error(traceback.format_exc())
conversation_instance.state = ConversationState.ERROR
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
# --- Post LLM Action Task Handling ---
if not llm_action_completed_successfully:
if conversation_instance.consecutive_llm_action_failures >= MAX_CONSECUTIVE_LLM_ACTION_FAILURES:
logger.error(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM相关动作连续失败或被取消 {conversation_instance.consecutive_llm_action_failures} 次。将强制等待并重置计数器。"
)
action = "wait" # Force action to wait
reason = f"LLM连续失败{conversation_instance.consecutive_llm_action_failures}次,强制等待"
conversation_instance.consecutive_llm_action_failures = 0
if conversation_instance.conversation_info:
conversation_instance.conversation_info.last_successful_reply_action = None
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 执行强制等待动作...")
await actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
_force_reflect_and_act_next_iter = False
conversation_instance.state = ConversationState.ANALYZING
await asyncio.sleep(1)
continue
else:
conversation_instance.state = ConversationState.ANALYZING
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作中断/失败准备重新规划。Interrupted by new msgs: {interrupted_by_new_messages}, Consecutive LLM Failures: {conversation_instance.consecutive_llm_action_failures}"
)
await asyncio.sleep(0.1)
continue
else:
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 执行非LLM类动作 '{action}'...")
conversation_instance.consecutive_llm_action_failures = 0
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) 重置 consecutive_llm_action_failures due to non-LLM action."
)
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = (
other_new_msg_count_action_planning
)
await actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 非LLM类动作 '{action}' 完成。")
# 检查是否需要反思
last_action_record = {}
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
last_action_record = conversation_instance.conversation_info.done_action[-1]
if conversation_instance.conversation_info.done_action:
last_action_record = conversation_instance.conversation_info.done_action[-1]
if (
last_action_record.get("action") == "send_new_message"
and last_action_record.get("status") == "done_no_reply"
):
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到需反思,设置标志。")
force_reflect_and_act = True
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) 检测到 ReplyGenerator 决定不发送消息,下一轮将强制反思。"
)
_force_reflect_and_act_next_iter = True
# 检查结束条件
goal_ended: bool = False
if (
conversation_instance.conversation_info
@ -286,7 +422,9 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
last_action_record_for_end_check = {}
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1]
if conversation_instance.conversation_info.done_action:
last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1]
action_ended: bool = (
last_action_record_for_end_check.get("action") in ["end_conversation", "say_goodbye"]
and last_action_record_for_end_check.get("status") == "done"
@ -294,12 +432,12 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
if goal_ended or action_ended:
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到结束条件,停止循环。")
await conversation_instance.stop() # 调用 Conversation 的 stop
continue # 虽然会 break但 continue 更明确
await conversation_instance.stop()
continue
except asyncio.CancelledError:
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) PFC 主循环任务被取消。")
await conversation_instance.stop() # 调用 Conversation 的 stop
await conversation_instance.stop()
break
except Exception as loop_err:
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) PFC 主循环出错: {loop_err}")
@ -307,7 +445,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
conversation_instance.state = ConversationState.ERROR
await asyncio.sleep(5)
# 控制循环频率
loop_duration = time.time() - loop_iter_start_time
min_loop_interval = 0.1
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 循环迭代耗时: {loop_duration:.3f} 秒。")

View File

@ -114,8 +114,6 @@ class ObservationInfo:
"""初始化 ObservationInfo"""
self.private_name: str = private_name
# 新增:发信人信息
self.sender_name: Optional[str] = None
self.sender_user_id: Optional[str] = None # 存储为字符串
self.sender_platform: Optional[str] = None
@ -232,23 +230,20 @@ class ObservationInfo:
if user_info:
try:
self.sender_user_id = str(user_info.user_id) # 确保是字符串
self.sender_name = user_info.user_nickname # 或者 user_info.card 如果私聊时card更准
self.sender_platform = user_info.platform
current_message_sender_id = self.sender_user_id # 用于后续逻辑
logger.debug(
f"[私聊][{self.private_name}] 更新发信人信息: ID={self.sender_user_id}, Name={self.sender_name}, Platform={self.sender_platform}"
f"[私聊][{self.private_name}] 更新发信人信息: ID={self.sender_user_id}, Name={self.private_name}, Platform={self.sender_platform}"
)
except AttributeError as e:
logger.error(f"[私聊][{self.private_name}] 从 UserInfo 对象提取信息时出错: {e}, UserInfo: {user_info}")
# 如果提取失败,将这些新字段设为 None避免使用旧数据
self.sender_user_id = None
self.sender_name = None
self.sender_platform = None
else:
logger.warning(f"[私聊][{self.private_name}] 处理消息更新时缺少有效的 UserInfo, message_id: {message_id}")
# 如果没有 UserInfo也将这些新字段设为 None
self.sender_user_id = None
self.sender_name = None
self.sender_platform = None
# --- 新增/修改结束 ---

View File

@ -3,7 +3,7 @@ from src.common.logger import get_module_logger
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from .pfc_utils import get_items_from_json
from .pfc_utils import get_items_from_json, build_chat_history_text
from src.individuality.individuality import Individuality
from .conversation_info import ConversationInfo
from .observation_info import ObservationInfo
@ -86,21 +86,7 @@ class GoalAnalyzer:
goals_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
# 获取聊天历史记录
chat_history_text = observation_info.chat_history_str
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
else:
chat_history_text += "\n--- 以上均为已读消息,未读消息均已处理完毕 ---\n"
# await observation_info.clear_unprocessed_messages()
chat_history_text = await build_chat_history_text(observation_info, self.private_name)
persona_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本

View File

@ -1,85 +0,0 @@
from typing import List, Tuple
from src.common.logger import get_module_logger
from src.plugins.memory_system.Hippocampus import HippocampusManager
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from ..chat.message import Message
from ..knowledge.knowledge_lib import qa_manager
from ..utils.chat_message_builder import build_readable_messages
logger = get_module_logger("knowledge_fetcher")
class KnowledgeFetcher:
"""知识调取器"""
def __init__(self, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=1000,
request_type="knowledge_fetch",
)
self.private_name = private_name
def _lpmm_get_knowledge(self, query: str) -> str:
"""获取相关知识
Args:
query: 查询内容
Returns:
str: 构造好的,带相关度的知识
"""
logger.debug(f"[私聊][{self.private_name}]正在从LPMM知识库中获取知识")
try:
knowledge_info = qa_manager.get_knowledge(query)
logger.debug(f"[私聊][{self.private_name}]LPMM知识库查询结果: {knowledge_info:150}")
return knowledge_info
except Exception as e:
logger.error(f"[私聊][{self.private_name}]LPMM知识库搜索工具执行失败: {str(e)}")
return "未找到匹配的知识"
async def fetch(self, query: str, chat_history: List[Message]) -> Tuple[str, str]:
"""获取相关知识
Args:
query: 查询内容
chat_history: 聊天历史
Returns:
Tuple[str, str]: (获取的知识, 知识来源)
"""
# 构建查询上下文
chat_history_text = await build_readable_messages(
chat_history,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
# 从记忆中获取相关知识
related_memory = await HippocampusManager.get_instance().get_memory_from_text(
text=f"{query}\n{chat_history_text}",
max_memory_num=3,
max_memory_length=2,
max_depth=3,
fast_retrieval=False,
)
knowledge_text = ""
sources_text = "无记忆匹配" # 默认值
if related_memory:
sources = []
for memory in related_memory:
knowledge_text += memory[1] + "\n"
sources.append(f"记忆片段{memory[0]}")
knowledge_text = knowledge_text.strip()
sources_text = "".join(sources)
knowledge_text += "\n现在有以下**知识**可供参考:\n "
knowledge_text += self._lpmm_get_knowledge(query)
knowledge_text += "\n请记住这些**知识**,并根据**知识**回答问题。\n"
return knowledge_text or "未找到相关知识", sources_text or "无记忆匹配"

View File

@ -71,9 +71,7 @@ class PfcEmotionUpdater:
)
current_mood_text_from_manager = self.mood_mng.current_mood.text # 从 MoodManager 获取当前情绪文本
sender_name_for_prompt = getattr(observation_info, "sender_name", "对方")
if not sender_name_for_prompt:
sender_name_for_prompt = "对方"
sender_name_for_prompt = self.private_name
relationship_text_for_prompt = getattr(
conversation_info, "relationship_text", "关系一般。"
) # 从 ConversationInfo 获取关系文本

View File

@ -8,6 +8,9 @@ from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder #
from src.plugins.chat.chat_stream import ChatStream
from ..person_info.person_info import person_info_manager
import math
from src.plugins.utils.chat_message_builder import build_readable_messages
from .observation_info import ObservationInfo
from src.config.config import global_config
logger = get_logger("pfc_utils")
@ -339,3 +342,43 @@ async def adjust_relationship_value_nonlinear(old_value: float, raw_adjustment:
value = 0
return value
async def build_chat_history_text(observation_info: ObservationInfo, private_name: str) -> str:
"""构建聊天历史记录文本 (包含未处理消息)"""
chat_history_text = ""
try:
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str:
chat_history_text = observation_info.chat_history_str
elif hasattr(observation_info, "chat_history") and observation_info.chat_history:
history_slice = observation_info.chat_history[-20:]
chat_history_text = await build_readable_messages(
history_slice, replace_bot_name=True, merge_messages=False, timestamp_mode="relative", read_mark=0.0
)
else:
chat_history_text = "还没有聊天记录。\n"
unread_count = getattr(observation_info, "new_messages_count", 0)
unread_messages = getattr(observation_info, "unprocessed_messages", [])
if unread_count > 0 and unread_messages:
bot_qq_str = str(global_config.BOT_QQ)
other_unread_messages = [
msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str
]
other_unread_count = len(other_unread_messages)
if other_unread_count > 0:
new_messages_str = await build_readable_messages(
other_unread_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += f"\n{new_messages_str}\n------\n"
except AttributeError as e:
logger.warning(f"[私聊][{private_name}] 构建聊天记录文本时属性错误: {e}")
chat_history_text = "[获取聊天记录时出错]\n"
except Exception as e:
logger.error(f"[私聊][{private_name}] 处理聊天记录时发生未知错误: {e}")
chat_history_text = "[处理聊天记录时出错]\n"
return chat_history_text

View File

@ -1,4 +1,5 @@
import random
from .pfc_utils import retrieve_contextual_info
from src.common.logger_manager import get_logger
@ -9,7 +10,7 @@ from .reply_checker import ReplyChecker
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from src.plugins.utils.chat_message_builder import build_readable_messages
from .pfc_utils import build_chat_history_text
logger = get_logger("reply_generator")
@ -103,8 +104,6 @@ PROMPT_SEND_NEW_MESSAGE = """
{last_rejection_info}
{spam_warning_info}
请根据上述信息判断你是否要继续发一条新消息例如对之前消息的补充深入话题或追问等等如果你觉得要发送该消息应该
1. 符合对话目标""的角度发言不要自己与自己对话
2. 符合你的性格特征和身份细节
@ -216,25 +215,9 @@ class ReplyGenerator:
else:
goals_str = "- 目前没有明确对话目标\n"
chat_history_text = observation_info.chat_history_str
if observation_info.new_messages_count > 0 and observation_info.unprocessed_messages:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
elif not chat_history_text:
chat_history_text = "还没有聊天记录。"
else:
chat_history_text += "\n--- 以上均为已读消息,未读消息均已处理完毕 ---\n"
chat_history_text = await build_chat_history_text(observation_info, self.private_name)
sender_name_str = getattr(observation_info, "sender_name", "对方")
if not sender_name_str:
sender_name_str = "对方"
sender_name_str = self.private_name
relationship_text_str = getattr(conversation_info, "relationship_text", "你们还不熟悉。")
current_emotion_text_str = getattr(conversation_info, "current_emotion_text", "心情平静。")
@ -280,14 +263,14 @@ class ReplyGenerator:
)
# 新增:构建刷屏警告信息 for PROMPT_SEND_NEW_MESSAGE
spam_warning_message = ""
if action_type == "send_new_message": # 只在 send_new_message 时构建刷屏警告
if conversation_info.my_message_count > 5:
spam_warning_message = f"⚠️【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息!请谨慎考虑是否继续发送!以免刷屏对造成对方困扰!**"
elif conversation_info.my_message_count > 2:
spam_warning_message = f"💬【提示】**你已连续发送{str(conversation_info.my_message_count)}条消息。如果非必要,请避免连续发送,以免给对方造成困扰。**"
if spam_warning_message:
spam_warning_message = f"\n{spam_warning_message}\n"
# spam_warning_message = ""
# if action_type == "send_new_message": # 只在 send_new_message 时构建刷屏警告
# if conversation_info.my_message_count > 5:
# spam_warning_message = f"⚠️【警告】**你已连续发送{str(conversation_info.my_message_count)}条消息!请谨慎考虑是否继续发送!以免刷屏对造成对方困扰!**"
# elif conversation_info.my_message_count > 2:
# spam_warning_message = f"💬【提示】**你已连续发送{str(conversation_info.my_message_count)}条消息。如果非必要,请避免连续发送,以免给对方造成困扰。**"
# if spam_warning_message:
# spam_warning_message = f"\n{spam_warning_message}\n"
# --- 选择 Prompt ---
if action_type == "send_new_message":
@ -326,7 +309,7 @@ class ReplyGenerator:
if action_type == "send_new_message":
current_format_params = base_format_params.copy()
current_format_params["spam_warning_info"] = spam_warning_message
# current_format_params["spam_warning_info"] = spam_warning_message
prompt = prompt_template.format(**current_format_params)
elif action_type == "say_goodbye":
farewell_params = {

View File

@ -202,12 +202,13 @@ talk_allowed_private = [] # 可以回复消息的QQ号
pfc_chatting = false # 是否启用PFC聊天该功能仅作用于私聊与回复模式独立
api_polling_max_retries = 3
enable_pfc_reply_checker = true # 是否启用 PFC 的回复检查器
pfc_message_buffer_size = 2 # PFC 聊天消息缓冲数量,有利于使聊天节奏更加紧凑流畅,请根据实际 LLM 响应速度进行调整默认2条
[idle_conversation]
enable_idle_conversation = false # 是否启用 pfc 主动发言
[idle_chat]
enable_idle_chat = false # 是否启用 pfc 主动发言
idle_check_interval = 10 # 检查间隔10分钟检查一次
min_idle_time = 7200 # 最短无活动时间2小时 (7200秒)
max_idle_time = 18000 # 最长无活动时间5小时 (18000秒)
min_cooldown = 7200 # 最短冷却时间2小时 (7200秒)
max_cooldown = 18000 # 最长冷却时间5小时 (18000秒)
#下面的模型若使用硅基流动则不需要更改使用ds官方则改成.env自定义的宏使用自定义模型则选择定位相似的模型自己填写
#推理模型