From 5be6221a3cd6a96e92d907fd2048664cfd9a35dc Mon Sep 17 00:00:00 2001 From: 2829798842 <2829798842@qq.com> Date: Wed, 28 May 2025 20:01:17 +0800 Subject: [PATCH] Update normal_chat.py --- src/chat/normal_chat/normal_chat.py | 791 ++++++++++++++++------------ 1 file changed, 460 insertions(+), 331 deletions(-) diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 911c98b5..67bc1ab4 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -1,364 +1,493 @@ -import json # <--- 确保导入 json +import asyncio +import statistics # 导入 statistics 模块 +import time import traceback -from typing import List, Dict, Any, Optional -from rich.traceback import install -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config -from src.chat.focus_chat.info.info_base import InfoBase -from src.chat.focus_chat.info.obs_info import ObsInfo -from src.chat.focus_chat.info.cycle_info import CycleInfo -from src.chat.focus_chat.info.mind_info import MindInfo -from src.chat.focus_chat.info.action_info import ActionInfo -from src.chat.focus_chat.info.structured_info import StructuredInfo -from src.chat.focus_chat.info.self_info import SelfInfo +from random import random +from typing import List, Optional # 导入 Optional + +from maim_message import UserInfo, Seg + from src.common.logger_manager import get_logger -from src.chat.utils.prompt_builder import Prompt, global_prompt_manager -from src.individuality.individuality import individuality -from src.chat.focus_chat.planners.action_manager import ActionManager +from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info +from src.manager.mood_manager import mood_manager +from src.chat.message_receive.chat_stream import ChatStream, chat_manager +from src.person_info.relationship_manager import relationship_manager +from src.chat.utils.info_catcher import info_catcher_manager +from src.chat.utils.timer_calculator import Timer +from src.chat.utils.prompt_builder import global_prompt_manager +from .normal_chat_generator import NormalChatGenerator +from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from src.chat.message_receive.message_sender import message_manager +from src.chat.utils.utils_image import image_path_to_base64 +from src.chat.emoji_system.emoji_manager import emoji_manager +from src.chat.normal_chat.willing.willing_manager import willing_manager +from src.config.config import global_config -logger = get_logger("planner") - -install(extra_lines=3) +logger = get_logger("normal_chat") -def init_prompt(): - Prompt( - """ -Your self-awareness is: -{self_info_block} +class NormalChat: + def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None): + """初始化 NormalChat 实例。只进行同步操作。""" -{extra_info_block} + self.chat_stream = chat_stream + self.stream_id = chat_stream.stream_id + self.stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id + + # Interest dict + self.interest_dict = interest_dict + + self.is_group_chat: bool = False + self.chat_target_info: Optional[dict] = None + + # Other sync initializations + self.gpt = NormalChatGenerator() + self.mood_manager = mood_manager + self.start_time = time.time() + self._chat_task: Optional[asyncio.Task] = None + self._initialized = False # Track initialization status + + # 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply} + self.recent_replies = [] + self.max_replies_history = 20 # 最多保存最近20条回复记录 + + # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 + self.on_switch_to_focus_callback = on_switch_to_focus_callback + + self._disabled = False # 增加停用标志 -You need to decide how to participate in the conversation based on the following information -These information may conflict, please integrate these information, and choose the most suitable action: + async def initialize(self): + """异步初始化,获取聊天类型和目标信息。""" + if self._initialized: + return -{chat_content_block} + self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.stream_id) + self.stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id + self._initialized = True + logger.info(f"[{self.stream_name}] NormalChat 实例 initialize 完成 (异步部分)。") -{mind_info_block} -{cycle_info_block} + # 改为实例方法 + async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str: + """创建思考消息""" + messageinfo = message.message_info -IMPORTANT: The following tool call information has the highest priority and should be heavily weighted in your decision-making: -{structured_info_block} - -Please analyze the conversation content and new messages you see, refer to the conversation plan, and choose the appropriate action: - -{action_options_text} - -You must choose one from the available actions above and explain why. -Your decision must be output in strict JSON format, and only contain JSON content, no other text or explanation. - -Please output your decision JSON in the following format: -{{ - "action": "action_name", - "reasoning": "Your decision reason", - "parameter1": "parameter1 value", - "parameter2": "parameter2 value", - "parameter3": "parameter3 value", - ... -}} - -Please output your decision JSON: """, - "planner_prompt", - ) - - Prompt( - """ -action_name: {action_name} - 描述:{action_description} - 参数: -{action_parameters} - 动作要求: -{action_require}""", - "action_prompt", - ) - - -class ActionPlanner: - def __init__(self, log_prefix: str, action_manager: ActionManager): - self.log_prefix = log_prefix - # LLM规划器配置 - self.planner_llm = LLMRequest( - model=global_config.model.focus_planner, - max_tokens=1000, - request_type="action_planning", # 用于动作规划 + bot_user_info = UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=messageinfo.platform, ) - self.action_manager = action_manager + thinking_time_point = round(time.time(), 2) + thinking_id = "mt" + str(thinking_time_point) + thinking_message = MessageThinking( + message_id=thinking_id, + chat_stream=self.chat_stream, + bot_user_info=bot_user_info, + reply=message, + thinking_start_time=thinking_time_point, + timestamp=timestamp if timestamp is not None else None, + ) - async def plan(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]: - """ - 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 + await message_manager.add_message(thinking_message) + return thinking_id - 参数: - all_plan_info: 所有计划信息 - cycle_timers: 计时器字典 - """ + # 改为实例方法 + async def _add_messages_to_manager( + self, message: MessageRecv, response_set: List[str], thinking_id + ) -> Optional[MessageSending]: + """发送回复消息""" + container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id + thinking_message = None - action = "no_reply" # 默认动作 - reasoning = "规划器初始化默认" - action_data = {} - - # 初始化所有可能用到的变量,避免UnboundLocalError - observed_messages = [] - observed_messages_str = "" - current_mind = "" - cycle_info = "" - self_info = "" - is_group_chat = True # 默认为群聊 - prompt = "未生成prompt" # 初始化prompt变量 + for msg in container.messages[:]: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + thinking_message = msg + container.messages.remove(msg) + break - try: - # 获取观察信息 - extra_info: list[str] = [] - _structured_info_list = [] + if not thinking_message: + logger.warning(f"[{self.stream_name}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") + return None - # 首先处理动作变更 - for info in all_plan_info: - if isinstance(info, ActionInfo) and info.has_changes(): - add_actions = info.get_add_actions() - remove_actions = info.get_remove_actions() - reason = info.get_reason() + thinking_start_time = thinking_message.thinking_start_time + message_set = MessageSet(self.chat_stream, thinking_id) # 使用 self.chat_stream - # 处理动作的增加 - for action_name in add_actions: - if action_name in self.action_manager.get_registered_actions(): - self.action_manager.add_action_to_using(action_name) - logger.debug(f"{self.log_prefix}添加动作: {action_name}, 原因: {reason}") - - # 处理动作的移除 - for action_name in remove_actions: - self.action_manager.remove_action_from_using(action_name) - logger.debug(f"{self.log_prefix}移除动作: {action_name}, 原因: {reason}") - - # 如果当前选择的动作被移除了,更新为no_reply - if action in remove_actions: - action = "no_reply" - reasoning = f"之前选择的动作{action}已被移除,原因: {reason}" - - # 继续处理其他信息 - for info in all_plan_info: - if isinstance(info, ObsInfo): - observed_messages = info.get_talking_message() - observed_messages_str = info.get_talking_message_str_truncate() - chat_type = info.get_chat_type() - is_group_chat = chat_type == "group" - elif isinstance(info, MindInfo): - current_mind = info.get_current_mind() - elif isinstance(info, CycleInfo): - cycle_info = info.get_observe_info() - elif isinstance(info, SelfInfo): - self_info = info.get_processed_info() - elif isinstance(info, StructuredInfo): - _structured_info = info.get_data() - # 收集工具调用的结构化信息 - if _structured_info and isinstance(_structured_info, dict): - # StructuredInfo 的数据结构是 {tool_type: tool_content} - for tool_type, tool_content in _structured_info.items(): - if tool_content: # 确保内容不为空 - _structured_info_list.append(f"{tool_type}: {str(tool_content)}") - elif not isinstance(info, ActionInfo): # 跳过已处理的ActionInfo - extra_info.append(info.get_processed_info()) - - # 获取当前可用的动作 - current_available_actions = self.action_manager.get_using_actions() - - # 如果没有可用动作,直接返回no_reply - if not current_available_actions: - logger.warning(f"{self.log_prefix}没有可用的动作,将使用no_reply") - action = "no_reply" - reasoning = "没有可用的动作" - return { - "action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning}, - "current_mind": current_mind, - "observed_messages": observed_messages, - } - - # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- - # 构建工具信息块 - structured_info_block_str = "\n".join(_structured_info_list) - if structured_info_block_str: - structured_info_block_str = f"The following is basic information returned by tool calls. Please use this information as the basis for subsequent decision-making and actions:\n{structured_info_block_str}" - else: - structured_info_block_str = "No tool information available for reference." - - prompt = await self.build_planner_prompt( - self_info_block=self_info, - is_group_chat=is_group_chat, # <-- Pass HFC state - chat_target_info=None, - observed_messages_str=observed_messages_str, # <-- Pass local variable - current_mind=current_mind, # <-- Pass argument - structured_info_block=structured_info_block_str, - current_available_actions=current_available_actions, # <-- Pass determined actions - cycle_info=cycle_info, # <-- Pass cycle info - extra_info=extra_info, + mark_head = False + first_bot_msg = None + for msg in response_set: + if global_config.experimental.debug_show_chat_mode: + msg += "ⁿ" + message_segment = Seg(type="text", data=msg) + bot_message = MessageSending( + message_id=thinking_id, + chat_stream=self.chat_stream, # 使用 self.chat_stream + bot_user_info=UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=not mark_head, + is_emoji=False, + thinking_start_time=thinking_start_time, + apply_set_reply_logic=True, ) + if not mark_head: + mark_head = True + first_bot_msg = bot_message + message_set.add_message(bot_message) - # --- 调用 LLM (普通文本生成) --- - llm_content = None - try: - llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt) - logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}") - except Exception as req_e: - logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}") - reasoning = f"LLM 请求失败,你的模型出现问题: {req_e}" - action = "no_reply" + await message_manager.add_message(message_set) - if llm_content: - try: - # 尝试去除可能的 markdown 代码块标记 - cleaned_content = ( - llm_content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() - ) - if not cleaned_content: - raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0) - parsed_json = json.loads(cleaned_content) + return first_bot_msg - # 提取决策,提供默认值 - extracted_action = parsed_json.get("action", "no_reply") - extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由") + # 改为实例方法 + async def _handle_emoji(self, message: MessageRecv, response: str): + """处理表情包""" + if random() < global_config.normal_chat.emoji_chance: + emoji_raw = await emoji_manager.get_emoji_for_text(response) + if emoji_raw: + emoji_path, description = emoji_raw + emoji_cq = image_path_to_base64(emoji_path) - # 将所有其他属性添加到action_data - action_data = {} - for key, value in parsed_json.items(): - if key not in ["action", "reasoning"]: - action_data[key] = value + thinking_time_point = round(message.message_info.time, 2) - # 对于reply动作不需要额外处理,因为相关字段已经在上面的循环中添加到action_data + message_segment = Seg(type="emoji", data=emoji_cq) + bot_message = MessageSending( + message_id="mt" + str(thinking_time_point), + chat_stream=self.chat_stream, # 使用 self.chat_stream + bot_user_info=UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=message.message_info.platform, + ), + sender_info=message.message_info.user_info, + message_segment=message_segment, + reply=message, + is_head=False, + is_emoji=True, + apply_set_reply_logic=True, + ) + await message_manager.add_message(bot_message) - if extracted_action not in current_available_actions: - logger.warning( - f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" + # 改为实例方法 (虽然它只用 message.chat_stream, 但逻辑上属于实例) + async def _update_relationship(self, message: MessageRecv, response_set): + """更新关系情绪""" + ori_response = ",".join(response_set) + stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text) + user_info = message.message_info.user_info + platform = user_info.platform + await relationship_manager.calculate_update_relationship_value( + user_info, + platform, + label=emotion, + stance=stance, # 使用 self.chat_stream + ) + self.mood_manager.update_mood_from_emotion(emotion, global_config.mood.mood_intensity_factor) + + async def _reply_interested_message(self) -> None: + """ + 后台任务方法,轮询当前实例关联chat的兴趣消息 + 通常由start_monitoring_interest()启动 + """ + while True: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + await asyncio.sleep(0.5) # 每秒检查一次 + # 检查任务是否已被取消 + if self._chat_task is None or self._chat_task.cancelled(): + logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") + break + + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + continue + + # 处理每条兴趣消息 + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + try: + # 处理消息 + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value, + rewind_response=False, ) - action = "no_reply" - reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}" - else: - # 动作有效且可用 - action = extracted_action - reasoning = extracted_reasoning + except Exception as e: + logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}") + finally: + self.interest_dict.pop(msg_id, None) - except Exception as json_e: - logger.warning( - f"{self.log_prefix}解析LLM响应JSON失败,模型返回不标准: {json_e}. LLM原始输出: '{llm_content}'" - ) - reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'." - action = "no_reply" - - except Exception as outer_e: - logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}") - traceback.print_exc() - action = "no_reply" - reasoning = f"Planner 内部处理错误: {outer_e}" - - logger.debug( - f"{self.log_prefix}规划器Prompt:\n{prompt}\n\n决策动作:{action},\n动作信息: '{action_data}'\n理由: {reasoning}" - ) - - # 恢复原始动作集 - self.action_manager.restore_actions() - logger.debug( - f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}" - ) - - action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning} - - plan_result = { - "action_result": action_result, - "current_mind": current_mind, - "observed_messages": observed_messages, - } - - return plan_result - - async def build_planner_prompt( - self, - self_info_block: str, - is_group_chat: bool, # Now passed as argument - chat_target_info: Optional[dict], # Now passed as argument - observed_messages_str: str, - current_mind: Optional[str], - structured_info_block: str, - current_available_actions: Dict[str, ActionInfo], - cycle_info: Optional[str], - extra_info: list[str], - ) -> str: - """构建 Planner LLM 的提示词 (获取模板并填充数据)""" - try: - # --- Determine chat context --- - chat_context_description = "你现在正在一个群聊中" - chat_target_name = None # Only relevant for private - if not is_group_chat and chat_target_info: - chat_target_name = ( - chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" - ) - chat_context_description = f"你正在和 {chat_target_name} 私聊" - - chat_content_block = "" - if observed_messages_str: - chat_content_block = f"聊天记录:\n{observed_messages_str}" - else: - chat_content_block = "你还未开始聊天" - - mind_info_block = "" - if current_mind: - mind_info_block = f"对聊天的规划:{current_mind}" - else: - mind_info_block = "你刚参与聊天" - - personality_block = individuality.get_prompt(x_person=2, level=2) - - action_options_block = "" - for using_actions_name, using_actions_info in current_available_actions.items(): - # print(using_actions_name) - # print(using_actions_info) - # print(using_actions_info["parameters"]) - # print(using_actions_info["require"]) - # print(using_actions_info["description"]) - - using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") - - param_text = "" - for param_name, param_description in using_actions_info["parameters"].items(): - param_text += f" {param_name}: {param_description}\n" - - require_text = "" - for require_item in using_actions_info["require"]: - require_text += f" - {require_item}\n" - - using_action_prompt = using_action_prompt.format( - action_name=using_actions_name, - action_description=using_actions_info["description"], - action_parameters=param_text, - action_require=require_text, - ) - - action_options_block += using_action_prompt - - extra_info_block = "\n".join(extra_info) # 将局部变量名从 extra_info_block_str 改为 extra_info_block - if extra_info_block: # 使用新的变量名进行检查 - extra_info_block = f"The following is some additional information. Please read the following content to make a decision:\n{extra_info_block}\nEnd of additional information." # 使用新的变量名进行格式化 - else: - extra_info_block = "No additional information available." # 使用新的变量名进行赋值 - - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") - prompt = planner_prompt_template.format( - self_info_block=self_info_block, - # bot_name=global_config.bot.nickname, - prompt_personality=personality_block, - chat_context_description=chat_context_description, - chat_content_block=chat_content_block, - mind_info_block=mind_info_block, - structured_info_block=structured_info_block, - cycle_info_block=cycle_info, - action_options_text=action_options_block, - extra_info_block=extra_info_block, # 确保这里传递的是修改后的变量名 + # 改为实例方法, 移除 chat 参数 + async def normal_response( + self, message: MessageRecv, is_mentioned: bool, interested_rate: float, rewind_response: bool = False + ) -> None: + # 新增:如果已停用,直接返回 + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") + return + # 检查收到的消息是否属于当前实例处理的 chat stream + if message.chat_stream.stream_id != self.stream_id: + logger.error( + f"[{self.stream_name}] normal_response 收到不匹配的消息 (来自 {message.chat_stream.stream_id}),预期 {self.stream_id}。已忽略。" ) - return prompt + return + timing_results = {} + + reply_probability = 1.0 if is_mentioned else 0.0 # 如果被提及,基础概率为1,否则需要意愿判断 + + # 意愿管理器:设置当前message信息 + + willing_manager.setup(message, self.chat_stream, is_mentioned, interested_rate) + + # 获取回复概率 + is_willing = False + # 仅在未被提及或基础概率不为1时查询意愿概率 + if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率 + is_willing = True + reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id) + + if message.message_info.additional_config: + if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys(): + reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"] + reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间 + + # 打印消息信息 + mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊" + current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time)) + # 使用 self.stream_id + willing_log = f"[回复意愿:{await willing_manager.get_willing(self.stream_id):.2f}]" if is_willing else "" + logger.info( + f"[{current_time}][{mes_name}]" + f"{message.message_info.user_info.user_nickname}:" # 使用 self.chat_stream + f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]" + ) + do_reply = False + response_set = None # 初始化 response_set + if random() < reply_probability: + do_reply = True + + # 回复前处理 + await willing_manager.before_generate_reply_handle(message.message_info.message_id) + + with Timer("创建思考消息", timing_results): + if rewind_response: + thinking_id = await self._create_thinking_message(message, message.message_info.time) + else: + thinking_id = await self._create_thinking_message(message) + + logger.debug(f"[{self.stream_name}] 创建捕捉器,thinking_id:{thinking_id}") + + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) + info_catcher.catch_decide_to_response(message) + + try: + with Timer("生成回复", timing_results): + response_set = await self.gpt.generate_response( + message=message, + thinking_id=thinking_id, + ) + + info_catcher.catch_after_generate_response(timing_results["生成回复"]) + except Exception as e: + logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") + response_set = None # 确保出错时 response_set 为 None + + if not response_set: + logger.info(f"[{self.stream_name}] 模型未生成回复内容") + # 如果模型未生成回复,移除思考消息 + container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id + for msg in container.messages[:]: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + container.messages.remove(msg) + logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}") + break + # 需要在此处也调用 not_reply_handle 和 delete 吗? + # 如果是因为模型没回复,也算是一种 "未回复" + await willing_manager.not_reply_handle(message.message_info.message_id) + willing_manager.delete(message.message_info.message_id) + return # 不执行后续步骤 + + logger.info(f"[{self.stream_name}] 回复内容: {response_set}") + + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") + return + + # 发送回复 (不再需要传入 chat) + with Timer("消息发送", timing_results): + first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id) + + # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) + if first_bot_msg: + info_catcher.catch_after_response(timing_results["消息发送"], response_set, first_bot_msg) + + # 记录回复信息到最近回复列表中 + reply_info = { + "time": time.time(), + "user_message": message.processed_plain_text, + "user_info": { + "user_id": message.message_info.user_info.user_id, + "user_nickname": message.message_info.user_info.user_nickname, + }, + "response": response_set, + "is_mentioned": is_mentioned, + "is_reference_reply": message.reply is not None, # 判断是否为引用回复 + "timing": {k: round(v, 2) for k, v in timing_results.items()}, + } + self.recent_replies.append(reply_info) + # 保持最近回复历史在限定数量内 + if len(self.recent_replies) > self.max_replies_history: + self.recent_replies = self.recent_replies[-self.max_replies_history :] + + # 检查是否需要切换到focus模式 + await self._check_switch_to_focus() + + else: + logger.warning(f"[{self.stream_name}] 思考消息 {thinking_id} 在发送前丢失,无法记录 info_catcher") + + info_catcher.done_catch() + + # 处理表情包 (不再需要传入 chat) + with Timer("处理表情包", timing_results): + await self._handle_emoji(message, response_set[0]) + + # 更新关系情绪 (不再需要传入 chat) + with Timer("关系更新", timing_results): + await self._update_relationship(message, response_set) + + # 回复后处理 + await willing_manager.after_generate_reply_handle(message.message_info.message_id) + + # 输出性能计时结果 + if do_reply and response_set: # 确保 response_set 不是 None + timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) + trigger_msg = message.processed_plain_text + response_msg = " ".join(response_set) + logger.info( + f"[{self.stream_name}] 触发消息: {trigger_msg[:20]}... | 推理消息: {response_msg[:20]}... | 性能计时: {timing_str}" + ) + elif not do_reply: + # 不回复处理 + await willing_manager.not_reply_handle(message.message_info.message_id) + + # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) + willing_manager.delete(message.message_info.message_id) + + # 改为实例方法, 移除 chat 参数 + + async def start_chat(self): + """先进行异步初始化,然后启动聊天任务。""" + if not self._initialized: + await self.initialize() # Ensure initialized before starting tasks + + self._disabled = False # 启动时重置停用标志 + + if self._chat_task is None or self._chat_task.done(): + logger.info(f"[{self.stream_name}] 开始处理兴趣消息...") + polling_task = asyncio.create_task(self._reply_interested_message()) + polling_task.add_done_callback(lambda t: self._handle_task_completion(t)) + self._chat_task = polling_task + else: + logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。") + + def _handle_task_completion(self, task: asyncio.Task): + """任务完成回调处理""" + if task is not self._chat_task: + logger.warning(f"[{self.stream_name}] 收到未知任务回调") + return + try: + if exc := task.exception(): + logger.error(f"[{self.stream_name}] 任务异常: {exc}") + traceback.print_exc() + except asyncio.CancelledError: + logger.debug(f"[{self.stream_name}] 任务已取消") except Exception as e: - logger.error(f"构建 Planner 提示词时出错: {e}") - logger.error(traceback.format_exc()) - return "构建 Planner Prompt 时出错" + logger.error(f"[{self.stream_name}] 回调处理错误: {e}") + finally: + if self._chat_task is task: + self._chat_task = None + logger.debug(f"[{self.stream_name}] 任务清理完成") + # 改为实例方法, 移除 stream_id 参数 + async def stop_chat(self): + """停止当前实例的兴趣监控任务。""" + self._disabled = True # 停止时设置停用标志 + if self._chat_task and not self._chat_task.done(): + task = self._chat_task + logger.debug(f"[{self.stream_name}] 尝试取消normal聊天任务。") + task.cancel() + try: + await task # 等待任务响应取消 + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 结束一般聊天模式。") + except Exception as e: + # 回调函数 _handle_task_completion 会处理异常日志 + logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}") + finally: + # 确保任务状态更新,即使等待出错 (回调函数也会尝试更新) + if self._chat_task is task: + self._chat_task = None -init_prompt() + # 清理所有未处理的思考消息 + try: + container = await message_manager.get_container(self.stream_id) + if container: + # 查找并移除所有 MessageThinking 类型的消息 + thinking_messages = [msg for msg in container.messages[:] if isinstance(msg, MessageThinking)] + if thinking_messages: + for msg in thinking_messages: + container.messages.remove(msg) + logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。") + except Exception as e: + logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}") + traceback.print_exc() + + # 获取最近回复记录的方法 + def get_recent_replies(self, limit: int = 10) -> List[dict]: + """获取最近的回复记录 + + Args: + limit: 最大返回数量,默认10条 + + Returns: + List[dict]: 最近的回复记录列表,每项包含: + time: 回复时间戳 + user_message: 用户消息内容 + user_info: 用户信息(user_id, user_nickname) + response: 回复内容 + is_mentioned: 是否被提及(@) + is_reference_reply: 是否为引用回复 + timing: 各阶段耗时 + """ + # 返回最近的limit条记录,按时间倒序排列 + return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True) + + async def _check_switch_to_focus(self) -> None: + """检查是否满足切换到focus模式的条件""" + if not self.on_switch_to_focus_callback: + return # 如果没有设置回调函数,直接返回 + current_time = time.time() + + time_threshold = 120 / global_config.chat.auto_focus_threshold + reply_threshold = 6 * global_config.chat.auto_focus_threshold + + one_minute_ago = current_time - time_threshold + + # 统计1分钟内的回复数量 + recent_reply_count = sum(1 for reply in self.recent_replies if reply["time"] > one_minute_ago) + # print(111111111111111333333333333333333333333331111111111111111111111111111111111) + # print(recent_reply_count) + # 如果1分钟内回复数量大于8,触发切换到focus模式 + if recent_reply_count > reply_threshold: + logger.info( + f"[{self.stream_name}] 检测到1分钟内回复数量({recent_reply_count})大于{reply_threshold},触发切换到focus模式" + ) + try: + # 调用回调函数通知上层切换到focus模式 + await self.on_switch_to_focus_callback() + except Exception as e: + logger.error(f"[{self.stream_name}] 触发切换到focus模式时出错: {e}\n{traceback.format_exc()}")