Update normal_chat.py

pull/998/head
2829798842 2025-05-28 20:00:28 +08:00 committed by GitHub
parent db22882dd9
commit d767cea165
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 326 additions and 455 deletions

View File

@ -1,493 +1,364 @@
import asyncio
# import statistics # 导入 statistics 模块
import time
import json # <--- 确保导入 json
import traceback
from random import random
from typing import List, Optional # 导入 Optional
from maim_message import UserInfo, Seg
from src.common.logger_manager import get_logger
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.manager.mood_manager import mood_manager
from src.chat.message_receive.chat_stream import ChatStream, chat_manager
from src.person_info.relationship_manager import relationship_manager
from src.chat.utils.info_catcher import info_catcher_manager
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.prompt_builder import global_prompt_manager
from .normal_chat_generator import NormalChatGenerator
from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
from src.chat.message_receive.message_sender import message_manager
from src.chat.utils.utils_image import image_path_to_base64
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.chat.normal_chat.willing.willing_manager import willing_manager
from typing import List, Dict, Any, Optional
from rich.traceback import install
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.chat.focus_chat.info.info_base import InfoBase
from src.chat.focus_chat.info.obs_info import ObsInfo
from src.chat.focus_chat.info.cycle_info import CycleInfo
from src.chat.focus_chat.info.mind_info import MindInfo
from src.chat.focus_chat.info.action_info import ActionInfo
from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.focus_chat.info.self_info import SelfInfo
from src.common.logger_manager import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import individuality
from src.chat.focus_chat.planners.action_manager import ActionManager
logger = get_logger("normal_chat")
logger = get_logger("planner")
install(extra_lines=3)
class NormalChat:
def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None):
"""初始化 NormalChat 实例。只进行同步操作。"""
self.chat_stream = chat_stream
self.stream_id = chat_stream.stream_id
self.stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id
# Interest dict
self.interest_dict = interest_dict
self.is_group_chat: bool = False
self.chat_target_info: Optional[dict] = None
# Other sync initializations
self.gpt = NormalChatGenerator()
self.mood_manager = mood_manager
self.start_time = time.time()
self._chat_task: Optional[asyncio.Task] = None
self._initialized = False # Track initialization status
# 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply}
self.recent_replies = []
self.max_replies_history = 20 # 最多保存最近20条回复记录
# 添加回调函数用于在满足条件时通知切换到focus_chat模式
self.on_switch_to_focus_callback = on_switch_to_focus_callback
self._disabled = False # 增加停用标志
async def initialize(self):
"""异步初始化,获取聊天类型和目标信息。"""
if self._initialized:
return
self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.stream_id)
self.stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id
self._initialized = True
logger.info(f"[{self.stream_name}] NormalChat 实例 initialize 完成 (异步部分)。")
# 改为实例方法
async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str:
"""创建思考消息"""
messageinfo = message.message_info
bot_user_info = UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=messageinfo.platform,
)
thinking_time_point = round(time.time(), 2)
thinking_id = "mt" + str(thinking_time_point)
thinking_message = MessageThinking(
message_id=thinking_id,
chat_stream=self.chat_stream,
bot_user_info=bot_user_info,
reply=message,
thinking_start_time=thinking_time_point,
timestamp=timestamp if timestamp is not None else None,
)
await message_manager.add_message(thinking_message)
return thinking_id
# 改为实例方法
async def _add_messages_to_manager(
self, message: MessageRecv, response_set: List[str], thinking_id
) -> Optional[MessageSending]:
"""发送回复消息"""
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
thinking_message = None
for msg in container.messages[:]:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
container.messages.remove(msg)
break
if not thinking_message:
logger.warning(f"[{self.stream_name}] 未找到对应的思考消息 {thinking_id},可能已超时被移除")
return None
thinking_start_time = thinking_message.thinking_start_time
message_set = MessageSet(self.chat_stream, thinking_id) # 使用 self.chat_stream
mark_head = False
first_bot_msg = None
for msg in response_set:
if global_config.experimental.debug_show_chat_mode:
msg += ""
message_segment = Seg(type="text", data=msg)
bot_message = MessageSending(
message_id=thinking_id,
chat_stream=self.chat_stream, # 使用 self.chat_stream
bot_user_info=UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=not mark_head,
is_emoji=False,
thinking_start_time=thinking_start_time,
apply_set_reply_logic=True,
)
if not mark_head:
mark_head = True
first_bot_msg = bot_message
message_set.add_message(bot_message)
await message_manager.add_message(message_set)
return first_bot_msg
# 改为实例方法
async def _handle_emoji(self, message: MessageRecv, response: str):
"""处理表情包"""
if random() < global_config.normal_chat.emoji_chance:
emoji_raw = await emoji_manager.get_emoji_for_text(response)
if emoji_raw:
emoji_path, description = emoji_raw
emoji_cq = image_path_to_base64(emoji_path)
thinking_time_point = round(message.message_info.time, 2)
message_segment = Seg(type="emoji", data=emoji_cq)
bot_message = MessageSending(
message_id="mt" + str(thinking_time_point),
chat_stream=self.chat_stream, # 使用 self.chat_stream
bot_user_info=UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=False,
is_emoji=True,
apply_set_reply_logic=True,
)
await message_manager.add_message(bot_message)
# 改为实例方法 (虽然它只用 message.chat_stream, 但逻辑上属于实例)
async def _update_relationship(self, message: MessageRecv, response_set):
"""更新关系情绪"""
ori_response = ",".join(response_set)
stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text)
user_info = message.message_info.user_info
platform = user_info.platform
await relationship_manager.calculate_update_relationship_value(
user_info,
platform,
label=emotion,
stance=stance, # 使用 self.chat_stream
)
self.mood_manager.update_mood_from_emotion(emotion, global_config.mood.mood_intensity_factor)
async def _reply_interested_message(self) -> None:
def init_prompt():
Prompt(
"""
后台任务方法轮询当前实例关联chat的兴趣消息
通常由start_monitoring_interest()启动
Your self-awareness is:
{self_info_block}
{extra_info_block}
You need to decide how to participate in the conversation based on the following information
These information may conflict, please integrate these information, and choose the most suitable action:
{chat_content_block}
{mind_info_block}
{cycle_info_block}
IMPORTANT: The following tool call information has the highest priority and should be heavily weighted in your decision-making:
{structured_info_block}
Please analyze the conversation content and new messages you see, refer to the conversation plan, and choose the appropriate action:
{action_options_text}
You must choose one from the available actions above and explain why.
Your decision must be output in strict JSON format, and only contain JSON content, no other text or explanation.
Please output your decision JSON in the following format:
{{
"action": "action_name",
"reasoning": "Your decision reason",
"parameter1": "parameter1 value",
"parameter2": "parameter2 value",
"parameter3": "parameter3 value",
...
}}
Please output your decision JSON: """,
"planner_prompt",
)
Prompt(
"""
while True:
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
await asyncio.sleep(0.5) # 每秒检查一次
# 检查任务是否已被取消
if self._chat_task is None or self._chat_task.cancelled():
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出")
break
action_name: {action_name}
描述{action_description}
参数
{action_parameters}
动作要求
{action_require}""",
"action_prompt",
)
items_to_process = list(self.interest_dict.items())
if not items_to_process:
continue
# 处理每条兴趣消息
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
try:
# 处理消息
await self.normal_response(
message=message,
is_mentioned=is_mentioned,
interested_rate=interest_value,
rewind_response=False,
)
except Exception as e:
logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}")
finally:
self.interest_dict.pop(msg_id, None)
# 改为实例方法, 移除 chat 参数
async def normal_response(
self, message: MessageRecv, is_mentioned: bool, interested_rate: float, rewind_response: bool = False
) -> None:
# 新增:如果已停用,直接返回
if self._disabled:
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
return
# 检查收到的消息是否属于当前实例处理的 chat stream
if message.chat_stream.stream_id != self.stream_id:
logger.error(
f"[{self.stream_name}] normal_response 收到不匹配的消息 (来自 {message.chat_stream.stream_id}),预期 {self.stream_id}。已忽略。"
)
return
timing_results = {}
reply_probability = 1.0 if is_mentioned else 0.0 # 如果被提及基础概率为1否则需要意愿判断
# 意愿管理器设置当前message信息
willing_manager.setup(message, self.chat_stream, is_mentioned, interested_rate)
# 获取回复概率
is_willing = False
# 仅在未被提及或基础概率不为1时查询意愿概率
if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率
is_willing = True
reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id)
if message.message_info.additional_config:
if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys():
reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"]
reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间
# 打印消息信息
mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊"
current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
# 使用 self.stream_id
willing_log = f"[回复意愿:{await willing_manager.get_willing(self.stream_id):.2f}]" if is_willing else ""
logger.info(
f"[{current_time}][{mes_name}]"
f"{message.message_info.user_info.user_nickname}:" # 使用 self.chat_stream
f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]"
class ActionPlanner:
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.focus_planner,
max_tokens=1000,
request_type="action_planning", # 用于动作规划
)
do_reply = False
response_set = None # 初始化 response_set
if random() < reply_probability:
do_reply = True
# 回复前处理
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
self.action_manager = action_manager
with Timer("创建思考消息", timing_results):
if rewind_response:
thinking_id = await self._create_thinking_message(message, message.message_info.time)
else:
thinking_id = await self._create_thinking_message(message)
async def plan(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作
logger.debug(f"[{self.stream_name}] 创建捕捉器thinking_id:{thinking_id}")
参数:
all_plan_info: 所有计划信息
cycle_timers: 计时器字典
"""
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
info_catcher.catch_decide_to_response(message)
action = "no_reply" # 默认动作
reasoning = "规划器初始化默认"
action_data = {}
# 初始化所有可能用到的变量避免UnboundLocalError
observed_messages = []
observed_messages_str = ""
current_mind = ""
cycle_info = ""
self_info = ""
is_group_chat = True # 默认为群聊
prompt = "未生成prompt" # 初始化prompt变量
try:
with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(
message=message,
thinking_id=thinking_id,
)
try:
# 获取观察信息
extra_info: list[str] = []
_structured_info_list = []
info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
response_set = None # 确保出错时 response_set 为 None
# 首先处理动作变更
for info in all_plan_info:
if isinstance(info, ActionInfo) and info.has_changes():
add_actions = info.get_add_actions()
remove_actions = info.get_remove_actions()
reason = info.get_reason()
if not response_set:
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
# 如果模型未生成回复,移除思考消息
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
for msg in container.messages[:]:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
container.messages.remove(msg)
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
break
# 需要在此处也调用 not_reply_handle 和 delete 吗?
# 如果是因为模型没回复,也算是一种 "未回复"
await willing_manager.not_reply_handle(message.message_info.message_id)
willing_manager.delete(message.message_info.message_id)
return # 不执行后续步骤
# 处理动作的增加
for action_name in add_actions:
if action_name in self.action_manager.get_registered_actions():
self.action_manager.add_action_to_using(action_name)
logger.debug(f"{self.log_prefix}添加动作: {action_name}, 原因: {reason}")
logger.info(f"[{self.stream_name}] 回复内容: {response_set}")
if self._disabled:
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
return
# 处理动作的移除
for action_name in remove_actions:
self.action_manager.remove_action_from_using(action_name)
logger.debug(f"{self.log_prefix}移除动作: {action_name}, 原因: {reason}")
# 发送回复 (不再需要传入 chat)
with Timer("消息发送", timing_results):
first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id)
# 如果当前选择的动作被移除了更新为no_reply
if action in remove_actions:
action = "no_reply"
reasoning = f"之前选择的动作{action}已被移除,原因: {reason}"
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg:
info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg)
# 继续处理其他信息
for info in all_plan_info:
if isinstance(info, ObsInfo):
observed_messages = info.get_talking_message()
observed_messages_str = info.get_talking_message_str_truncate()
chat_type = info.get_chat_type()
is_group_chat = chat_type == "group"
elif isinstance(info, MindInfo):
current_mind = info.get_current_mind()
elif isinstance(info, CycleInfo):
cycle_info = info.get_observe_info()
elif isinstance(info, SelfInfo):
self_info = info.get_processed_info()
elif isinstance(info, StructuredInfo):
_structured_info = info.get_data()
# 收集工具调用的结构化信息
if _structured_info and isinstance(_structured_info, dict):
# StructuredInfo 的数据结构是 {tool_type: tool_content}
for tool_type, tool_content in _structured_info.items():
if tool_content: # 确保内容不为空
_structured_info_list.append(f"{tool_type}: {str(tool_content)}")
elif not isinstance(info, ActionInfo): # 跳过已处理的ActionInfo
extra_info.append(info.get_processed_info())
# 记录回复信息到最近回复列表中
reply_info = {
"time": time.time(),
"user_message": message.processed_plain_text,
"user_info": {
"user_id": message.message_info.user_info.user_id,
"user_nickname": message.message_info.user_info.user_nickname,
},
"response": response_set,
"is_mentioned": is_mentioned,
"is_reference_reply": message.reply is not None, # 判断是否为引用回复
"timing": {k: round(v, 2) for k, v in timing_results.items()},
# 获取当前可用的动作
current_available_actions = self.action_manager.get_using_actions()
# 如果没有可用动作直接返回no_reply
if not current_available_actions:
logger.warning(f"{self.log_prefix}没有可用的动作将使用no_reply")
action = "no_reply"
reasoning = "没有可用的动作"
return {
"action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning},
"current_mind": current_mind,
"observed_messages": observed_messages,
}
self.recent_replies.append(reply_info)
# 保持最近回复历史在限定数量内
if len(self.recent_replies) > self.max_replies_history:
self.recent_replies = self.recent_replies[-self.max_replies_history :]
# 检查是否需要切换到focus模式
await self._check_switch_to_focus()
# --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
# 构建工具信息块
structured_info_block_str = "\n".join(_structured_info_list)
if structured_info_block_str:
structured_info_block_str = f"The following is basic information returned by tool calls. Please use this information as the basis for subsequent decision-making and actions:\n{structured_info_block_str}"
else:
logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher")
structured_info_block_str = "No tool information available for reference."
info_catcher.done_catch()
# 处理表情包 (不再需要传入 chat)
with Timer("处理表情包", timing_results):
await self._handle_emoji(message, response_set[0])
# 更新关系情绪 (不再需要传入 chat)
with Timer("关系更新", timing_results):
await self._update_relationship(message, response_set)
# 回复后处理
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
# 输出性能计时结果
if do_reply and response_set: # 确保 response_set 不是 None
timing_str = " | ".join([f"{step}: {duration:.2f}" for step, duration in timing_results.items()])
trigger_msg = message.processed_plain_text
response_msg = " ".join(response_set)
logger.info(
f"[{self.stream_name}] 触发消息: {trigger_msg[:20]}... | 推理消息: {response_msg[:20]}... | 性能计时: {timing_str}"
prompt = await self.build_planner_prompt(
self_info_block=self_info,
is_group_chat=is_group_chat, # <-- Pass HFC state
chat_target_info=None,
observed_messages_str=observed_messages_str, # <-- Pass local variable
current_mind=current_mind, # <-- Pass argument
structured_info_block=structured_info_block_str,
current_available_actions=current_available_actions, # <-- Pass determined actions
cycle_info=cycle_info, # <-- Pass cycle info
extra_info=extra_info,
)
elif not do_reply:
# 不回复处理
await willing_manager.not_reply_handle(message.message_info.message_id)
# 意愿管理器注销当前message信息 (无论是否回复,只要处理过就删除)
willing_manager.delete(message.message_info.message_id)
# 改为实例方法, 移除 chat 参数
async def start_chat(self):
"""先进行异步初始化,然后启动聊天任务。"""
if not self._initialized:
await self.initialize() # Ensure initialized before starting tasks
self._disabled = False # 启动时重置停用标志
if self._chat_task is None or self._chat_task.done():
logger.info(f"[{self.stream_name}] 开始处理兴趣消息...")
polling_task = asyncio.create_task(self._reply_interested_message())
polling_task.add_done_callback(lambda t: self._handle_task_completion(t))
self._chat_task = polling_task
else:
logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。")
def _handle_task_completion(self, task: asyncio.Task):
"""任务完成回调处理"""
if task is not self._chat_task:
logger.warning(f"[{self.stream_name}] 收到未知任务回调")
return
try:
if exc := task.exception():
logger.error(f"[{self.stream_name}] 任务异常: {exc}")
traceback.print_exc()
except asyncio.CancelledError:
logger.debug(f"[{self.stream_name}] 任务已取消")
except Exception as e:
logger.error(f"[{self.stream_name}] 回调处理错误: {e}")
finally:
if self._chat_task is task:
self._chat_task = None
logger.debug(f"[{self.stream_name}] 任务清理完成")
# 改为实例方法, 移除 stream_id 参数
async def stop_chat(self):
"""停止当前实例的兴趣监控任务。"""
self._disabled = True # 停止时设置停用标志
if self._chat_task and not self._chat_task.done():
task = self._chat_task
logger.debug(f"[{self.stream_name}] 尝试取消normal聊天任务。")
task.cancel()
# --- 调用 LLM (普通文本生成) ---
llm_content = None
try:
await task # 等待任务响应取消
except asyncio.CancelledError:
logger.info(f"[{self.stream_name}] 结束一般聊天模式。")
except Exception as e:
# 回调函数 _handle_task_completion 会处理异常日志
logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}")
finally:
# 确保任务状态更新,即使等待出错 (回调函数也会尝试更新)
if self._chat_task is task:
self._chat_task = None
llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt)
logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}")
except Exception as req_e:
logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}")
reasoning = f"LLM 请求失败,你的模型出现问题: {req_e}"
action = "no_reply"
# 清理所有未处理的思考消息
try:
container = await message_manager.get_container(self.stream_id)
if container:
# 查找并移除所有 MessageThinking 类型的消息
thinking_messages = [msg for msg in container.messages[:] if isinstance(msg, MessageThinking)]
if thinking_messages:
for msg in thinking_messages:
container.messages.remove(msg)
logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。")
except Exception as e:
logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}")
if llm_content:
try:
# 尝试去除可能的 markdown 代码块标记
cleaned_content = (
llm_content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
)
if not cleaned_content:
raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0)
parsed_json = json.loads(cleaned_content)
# 提取决策,提供默认值
extracted_action = parsed_json.get("action", "no_reply")
extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由")
# 将所有其他属性添加到action_data
action_data = {}
for key, value in parsed_json.items():
if key not in ["action", "reasoning"]:
action_data[key] = value
# 对于reply动作不需要额外处理因为相关字段已经在上面的循环中添加到action_data
if extracted_action not in current_available_actions:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
)
action = "no_reply"
reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}"
else:
# 动作有效且可用
action = extracted_action
reasoning = extracted_reasoning
except Exception as json_e:
logger.warning(
f"{self.log_prefix}解析LLM响应JSON失败模型返回不标准: {json_e}. LLM原始输出: '{llm_content}'"
)
reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
action = "no_reply"
except Exception as outer_e:
logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}")
traceback.print_exc()
action = "no_reply"
reasoning = f"Planner 内部处理错误: {outer_e}"
# 获取最近回复记录的方法
def get_recent_replies(self, limit: int = 10) -> List[dict]:
"""获取最近的回复记录
logger.debug(
f"{self.log_prefix}规划器Prompt:\n{prompt}\n\n决策动作:{action},\n动作信息: '{action_data}'\n理由: {reasoning}"
)
Args:
limit: 最大返回数量默认10条
# 恢复原始动作集
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
Returns:
List[dict]: 最近的回复记录列表每项包含
time: 回复时间戳
user_message: 用户消息内容
user_info: 用户信息(user_id, user_nickname)
response: 回复内容
is_mentioned: 是否被提及(@)
is_reference_reply: 是否为引用回复
timing: 各阶段耗时
"""
# 返回最近的limit条记录按时间倒序排列
return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True)
action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning}
async def _check_switch_to_focus(self) -> None:
"""检查是否满足切换到focus模式的条件"""
if not self.on_switch_to_focus_callback:
return # 如果没有设置回调函数,直接返回
current_time = time.time()
plan_result = {
"action_result": action_result,
"current_mind": current_mind,
"observed_messages": observed_messages,
}
time_threshold = 120 / global_config.chat.auto_focus_threshold
reply_threshold = 6 * global_config.chat.auto_focus_threshold
return plan_result
one_minute_ago = current_time - time_threshold
async def build_planner_prompt(
self,
self_info_block: str,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
observed_messages_str: str,
current_mind: Optional[str],
structured_info_block: str,
current_available_actions: Dict[str, ActionInfo],
cycle_info: Optional[str],
extra_info: list[str],
) -> str:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
try:
# --- Determine chat context ---
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None # Only relevant for private
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
# 统计1分钟内的回复数量
recent_reply_count = sum(1 for reply in self.recent_replies if reply["time"] > one_minute_ago)
# print(111111111111111333333333333333333333333331111111111111111111111111111111111)
# print(recent_reply_count)
# 如果1分钟内回复数量大于8触发切换到focus模式
if recent_reply_count > reply_threshold:
logger.info(
f"[{self.stream_name}] 检测到1分钟内回复数量({recent_reply_count})大于{reply_threshold}触发切换到focus模式"
chat_content_block = ""
if observed_messages_str:
chat_content_block = f"聊天记录:\n{observed_messages_str}"
else:
chat_content_block = "你还未开始聊天"
mind_info_block = ""
if current_mind:
mind_info_block = f"对聊天的规划:{current_mind}"
else:
mind_info_block = "你刚参与聊天"
personality_block = individuality.get_prompt(x_person=2, level=2)
action_options_block = ""
for using_actions_name, using_actions_info in current_available_actions.items():
# print(using_actions_name)
# print(using_actions_info)
# print(using_actions_info["parameters"])
# print(using_actions_info["require"])
# print(using_actions_info["description"])
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
param_text = ""
for param_name, param_description in using_actions_info["parameters"].items():
param_text += f" {param_name}: {param_description}\n"
require_text = ""
for require_item in using_actions_info["require"]:
require_text += f" - {require_item}\n"
using_action_prompt = using_action_prompt.format(
action_name=using_actions_name,
action_description=using_actions_info["description"],
action_parameters=param_text,
action_require=require_text,
)
action_options_block += using_action_prompt
extra_info_block = "\n".join(extra_info) # 将局部变量名从 extra_info_block_str 改为 extra_info_block
if extra_info_block: # 使用新的变量名进行检查
extra_info_block = f"The following is some additional information. Please read the following content to make a decision:\n{extra_info_block}\nEnd of additional information." # 使用新的变量名进行格式化
else:
extra_info_block = "No additional information available." # 使用新的变量名进行赋值
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
self_info_block=self_info_block,
# bot_name=global_config.bot.nickname,
prompt_personality=personality_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
mind_info_block=mind_info_block,
structured_info_block=structured_info_block,
cycle_info_block=cycle_info,
action_options_text=action_options_block,
extra_info_block=extra_info_block, # 确保这里传递的是修改后的变量名
)
try:
# 调用回调函数通知上层切换到focus模式
await self.on_switch_to_focus_callback()
except Exception as e:
logger.error(f"[{self.stream_name}] 触发切换到focus模式时出错: {e}\n{traceback.format_exc()}")
return prompt
except Exception as e:
logger.error(f"构建 Planner 提示词时出错: {e}")
logger.error(traceback.format_exc())
return "构建 Planner Prompt 时出错"
init_prompt()