From f89c5521dcb9ba66206626cffb675dd49f699d96 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Tue, 16 Sep 2025 14:24:44 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9B=E6=9B=B4=E6=94=B9=E4=BA=86talk=5F?= =?UTF-8?q?value=E7=9A=84=E5=90=AB=E4=B9=89=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E7=A7=81=E8=81=8A=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- changelogs/changelog.md | 2 +- src/chat/heart_flow/brain_chat.py | 630 ++++++++++++++++++++++++++ src/chat/heart_flow/heartFC_chat.py | 213 ++++----- src/chat/heart_flow/heartflow.py | 22 +- src/chat/planner_actions/planner.py | 6 +- src/chat/replyer/default_generator.py | 2 +- src/config/official_configs.py | 3 - template/bot_config_template.toml | 2 +- 8 files changed, 752 insertions(+), 128 deletions(-) create mode 100644 src/chat/heart_flow/brain_chat.py diff --git a/changelogs/changelog.md b/changelogs/changelog.md index a2126d98..89355677 100644 --- a/changelogs/changelog.md +++ b/changelogs/changelog.md @@ -16,7 +16,7 @@ - 添加多种发送类型 - 优化识图token限制 - 为空回复添加重试机制 - +- 加入brainchat模式,为私聊支持做准备 ## [0.10.2] - 2025-8-31 diff --git a/src/chat/heart_flow/brain_chat.py b/src/chat/heart_flow/brain_chat.py new file mode 100644 index 00000000..457737fc --- /dev/null +++ b/src/chat/heart_flow/brain_chat.py @@ -0,0 +1,630 @@ +import asyncio +import time +import traceback +import random +from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING +from rich.traceback import install + +from src.config.config import global_config +from src.common.logger import get_logger +from src.common.data_models.info_data_model import ActionPlannerInfo +from src.common.data_models.message_data_model import ReplyContentType +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager +from src.chat.utils.prompt_builder import global_prompt_manager +from src.chat.utils.timer_calculator import Timer +from src.chat.planner_actions.planner import ActionPlanner +from src.chat.planner_actions.action_modifier import ActionModifier +from src.chat.planner_actions.action_manager import ActionManager +from src.chat.heart_flow.hfc_utils import CycleDetail +from src.chat.heart_flow.hfc_utils import send_typing, stop_typing +from src.chat.express.expression_learner import expression_learner_manager +from src.person_info.person_info import Person +from src.plugin_system.base.component_types import EventType, ActionInfo +from src.plugin_system.core import events_manager +from src.plugin_system.apis import generator_api, send_api, message_api, database_api +from src.mais4u.mai_think import mai_thinking_manager +from src.mais4u.s4u_config import s4u_config +from src.chat.utils.chat_message_builder import ( + build_readable_messages_with_id, + get_raw_msg_before_timestamp_with_chat, +) + +if TYPE_CHECKING: + from src.common.data_models.database_data_model import DatabaseMessages + from src.common.data_models.message_data_model import ReplySetModel + + +ERROR_LOOP_INFO = { + "loop_plan_info": { + "action_result": { + "action_type": "error", + "action_data": {}, + "reasoning": "循环处理失败", + }, + }, + "loop_action_info": { + "action_taken": False, + "reply_text": "", + "command": "", + "taken_time": time.time(), + }, +} + + +install(extra_lines=3) + +# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 + +logger = get_logger("bc") # Logger Name Changed + + +class BrainChatting: + """ + 管理一个连续的私聊Brain Chat循环 + 用于在特定聊天流中生成回复。 + """ + + def __init__(self, chat_id: str): + """ + BrainChatting 初始化函数 + + 参数: + chat_id: 聊天流唯一标识符(如stream_id) + on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 + performance_version: 性能记录版本号,用于区分不同启动版本 + """ + # 基础属性 + self.stream_id: str = chat_id # 聊天流ID + self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore + if not self.chat_stream: + raise ValueError(f"无法找到聊天流: {self.stream_id}") + self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" + + self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) + + self.action_manager = ActionManager() + self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager) + self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) + + # 循环控制内部状态 + self.running: bool = False + self._loop_task: Optional[asyncio.Task] = None # 主循环任务 + + # 添加循环信息管理相关的属性 + self.history_loop: List[CycleDetail] = [] + self._cycle_counter = 0 + self._current_cycle_detail: CycleDetail = None # type: ignore + + self.last_read_time = time.time() - 2 + + self.talk_threshold = global_config.chat.talk_value + + self.no_reply_until_call = False + + async def start(self): + """检查是否需要启动主循环,如果未激活则启动。""" + + # 如果循环已经激活,直接返回 + if self.running: + logger.debug(f"{self.log_prefix} BrainChatting 已激活,无需重复启动") + return + + try: + # 标记为活动状态,防止重复启动 + self.running = True + + self._loop_task = asyncio.create_task(self._main_chat_loop()) + self._loop_task.add_done_callback(self._handle_loop_completion) + logger.info(f"{self.log_prefix} BrainChatting 启动完成") + + except Exception as e: + # 启动失败时重置状态 + self.running = False + self._loop_task = None + logger.error(f"{self.log_prefix} BrainChatting 启动失败: {e}") + raise + + def _handle_loop_completion(self, task: asyncio.Task): + """当 _hfc_loop 任务完成时执行的回调。""" + try: + if exception := task.exception(): + logger.error(f"{self.log_prefix} BrainChatting: 脱离了聊天(异常): {exception}") + logger.error(traceback.format_exc()) # Log full traceback for exceptions + else: + logger.info(f"{self.log_prefix} BrainChatting: 脱离了聊天 (外部停止)") + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} BrainChatting: 结束了聊天") + + def start_cycle(self) -> Tuple[Dict[str, float], str]: + self._cycle_counter += 1 + self._current_cycle_detail = CycleDetail(self._cycle_counter) + self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" + cycle_timers = {} + return cycle_timers, self._current_cycle_detail.thinking_id + + def end_cycle(self, loop_info, cycle_timers): + self._current_cycle_detail.set_loop_info(loop_info) + self.history_loop.append(self._current_cycle_detail) + self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.end_time = time.time() + + def print_cycle_info(self, cycle_timers): + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + ) + + async def _loopbody(self): # sourcery skip: hoist-if-from-if + recent_messages_list = message_api.get_messages_by_time_in_chat( + chat_id=self.stream_id, + start_time=self.last_read_time, + end_time=time.time(), + limit=20, + limit_mode="latest", + filter_mai=True, + filter_command=True, + ) + + if len(recent_messages_list) >= 1: + # !处理no_reply_until_call逻辑 + if self.no_reply_until_call: + for message in recent_messages_list: + if ( + message.is_mentioned + or message.is_at + or len(recent_messages_list) >= 8 + or time.time() - self.last_read_time > 600 + ): + self.no_reply_until_call = False + break + # 没有提到,继续保持沉默 + if self.no_reply_until_call: + # logger.info(f"{self.log_prefix} 没有提到,继续保持沉默") + await asyncio.sleep(1) + return True + + self.last_read_time = time.time() + + await self._observe( + recent_messages_list=recent_messages_list + ) + else: + # Normal模式:消息数量不足,等待 + await asyncio.sleep(0.2) + return True + return True + + async def _send_and_store_reply( + self, + response_set: "ReplySetModel", + action_message: "DatabaseMessages", + cycle_timers: Dict[str, float], + thinking_id, + actions, + selected_expressions: Optional[List[int]] = None, + ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: + with Timer("回复发送", cycle_timers): + reply_text = await self._send_response( + reply_set=response_set, + message_data=action_message, + selected_expressions=selected_expressions, + ) + + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 + platform = action_message.chat_info.platform + if platform is None: + platform = getattr(self.chat_stream, "platform", "unknown") + + person = Person(platform=platform, user_id=action_message.user_info.user_id) + person_name = person.person_name + action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" + + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=action_prompt_display, + action_done=True, + thinking_id=thinking_id, + action_data={"reply_text": reply_text}, + action_name="reply", + ) + + # 构建循环信息 + loop_info: Dict[str, Any] = { + "loop_plan_info": { + "action_result": actions, + }, + "loop_action_info": { + "action_taken": True, + "reply_text": reply_text, + "command": "", + "taken_time": time.time(), + }, + } + + return loop_info, reply_text, cycle_timers + + async def _observe( + self, # interest_value: float = 0.0, + recent_messages_list: Optional[List["DatabaseMessages"]] = None + ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if + if recent_messages_list is None: + recent_messages_list = [] + reply_text = "" # 初始化reply_text变量,避免UnboundLocalError + + if s4u_config.enable_s4u: + await send_typing() + + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + await self.expression_learner.trigger_learning_for_chat() + + cycle_timers, thinking_id = self.start_cycle() + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") + + # 第一步:动作检查 + available_actions: Dict[str, ActionInfo] = {} + try: + await self.action_modifier.modify_actions() + available_actions = self.action_manager.get_using_actions() + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") + + # 执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + with Timer("规划器", cycle_timers): + action_to_use_info, _ = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, + ) + + has_reply = False + for action in action_to_use_info: + if action.action_type == "reply": + has_reply = True + break + + if not has_reply: + action_to_use_info.append( + ActionPlannerInfo( + action_type="reply", + reasoning="进行回复", + action_data={}, + action_message=recent_messages_list[0], + available_actions=available_actions, + ) + ) + + # 3. 并行执行所有动作 + action_tasks = [ + asyncio.create_task( + self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) + ) + for action in action_to_use_info + ] + + # 并行执行所有任务 + results = await asyncio.gather(*action_tasks, return_exceptions=True) + + # 处理执行结果 + reply_loop_info = None + reply_text_from_reply = "" + action_success = False + action_reply_text = "" + + for result in results: + if isinstance(result, BaseException): + logger.error(f"{self.log_prefix} 动作执行异常: {result}") + continue + + if result["action_type"] != "reply": + action_success = result["success"] + action_reply_text = result["reply_text"] + elif result["action_type"] == "reply": + if result["success"]: + reply_loop_info = result["loop_info"] + reply_text_from_reply = result["reply_text"] + else: + logger.warning(f"{self.log_prefix} 回复动作执行失败") + + # 构建最终的循环信息 + if reply_loop_info: + # 如果有回复信息,使用回复的loop_info作为基础 + loop_info = reply_loop_info + # 更新动作执行信息 + loop_info["loop_action_info"].update( + { + "action_taken": action_success, + "taken_time": time.time(), + } + ) + reply_text = reply_text_from_reply + else: + # 没有回复信息,构建纯动作的loop_info + loop_info = { + "loop_plan_info": { + "action_result": action_to_use_info, + }, + "loop_action_info": { + "action_taken": action_success, + "reply_text": action_reply_text, + "taken_time": time.time(), + }, + } + reply_text = action_reply_text + + self.end_cycle(loop_info, cycle_timers) + self.print_cycle_info(cycle_timers) + + """S4U内容,暂时保留""" + if s4u_config.enable_s4u: + await stop_typing() + await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text) + """S4U内容,暂时保留""" + + return True + + async def _main_chat_loop(self): + """主循环,持续进行计划并可能回复消息,直到被外部取消。""" + try: + while self.running: + # 主循环 + success = await self._loopbody() + await asyncio.sleep(0.1) + if not success: + break + except asyncio.CancelledError: + # 设置了关闭标志位后被取消是正常流程 + logger.info(f"{self.log_prefix} 麦麦已关闭聊天") + except Exception: + logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") + print(traceback.format_exc()) + await asyncio.sleep(3) + self._loop_task = asyncio.create_task(self._main_chat_loop()) + logger.error(f"{self.log_prefix} 结束了当前聊天循环") + + async def _handle_action( + self, + action: str, + reasoning: str, + action_data: dict, + cycle_timers: Dict[str, float], + thinking_id: str, + action_message: Optional["DatabaseMessages"] = None, + ) -> tuple[bool, str, str]: + """ + 处理规划动作,使用动作工厂创建相应的动作处理器 + + 参数: + action: 动作类型 + reasoning: 决策理由 + action_data: 动作数据,包含不同动作需要的参数 + cycle_timers: 计时器字典 + thinking_id: 思考ID + + 返回: + tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) + """ + try: + # 使用工厂创建动作处理器实例 + try: + action_handler = self.action_manager.create_action( + action_name=action, + action_data=action_data, + reasoning=reasoning, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + chat_stream=self.chat_stream, + log_prefix=self.log_prefix, + action_message=action_message, + ) + except Exception as e: + logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") + traceback.print_exc() + return False, "", "" + + if not action_handler: + logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") + return False, "", "" + + # 处理动作并获取结果 + result = await action_handler.execute() + success, action_text = result + command = "" + + return success, action_text, command + + except Exception as e: + logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") + traceback.print_exc() + return False, "", "" + + async def _send_response( + self, + reply_set: "ReplySetModel", + message_data: "DatabaseMessages", + selected_expressions: Optional[List[int]] = None, + ) -> str: + new_message_count = message_api.count_new_messages( + chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() + ) + + need_reply = new_message_count >= random.randint(2, 4) + + if need_reply: + logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") + + reply_text = "" + first_replied = False + for reply_content in reply_set.reply_data: + if reply_content.content_type != ReplyContentType.TEXT: + continue + data: str = reply_content.content # type: ignore + if not first_replied: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=need_reply, + typing=False, + selected_expressions=selected_expressions, + ) + first_replied = True + else: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=False, + typing=True, + selected_expressions=selected_expressions, + ) + reply_text += data + + return reply_text + + async def _execute_action( + self, + action_planner_info: ActionPlannerInfo, + chosen_action_plan_infos: List[ActionPlannerInfo], + thinking_id: str, + available_actions: Dict[str, ActionInfo], + cycle_timers: Dict[str, float], + ): + """执行单个动作的通用函数""" + try: + with Timer(f"动作{action_planner_info.action_type}", cycle_timers): + + if action_planner_info.action_type == "no_reply": + # 直接处理no_action逻辑,不再通过动作系统 + reason = action_planner_info.reasoning or "选择不回复" + # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") + + # 存储no_action信息到数据库 + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={"reason": reason}, + action_name="no_action", + ) + return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "wait_time": + action_planner_info.action_data = action_planner_info.action_data or {} + logger.info(f"{self.log_prefix} 等待{action_planner_info.action_data['time']}秒后回复") + await asyncio.sleep(action_planner_info.action_data["time"]) + return {"action_type": "wait_time", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "no_reply_until_call": + logger.info(f"{self.log_prefix} 保持沉默,直到有人直接叫的名字") + self.no_reply_until_call = True + return {"action_type": "no_reply_until_call", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "reply": + try: + success, llm_response = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_message=action_planner_info.action_message, + available_actions=available_actions, + chosen_actions=chosen_action_plan_infos, + reply_reason=action_planner_info.reasoning or "", + enable_tool=global_config.tool.enable_tool, + request_type="replyer", + from_plugin=False, + ) + + if not success or not llm_response or not llm_response.reply_set: + if action_planner_info.action_message: + logger.info(f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败") + else: + logger.info("回复生成失败") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + + except asyncio.CancelledError: + logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + response_set = llm_response.reply_set + selected_expressions = llm_response.selected_expressions + loop_info, reply_text, _ = await self._send_and_store_reply( + response_set=response_set, + action_message=action_planner_info.action_message, # type: ignore + cycle_timers=cycle_timers, + thinking_id=thinking_id, + actions=chosen_action_plan_infos, + selected_expressions=selected_expressions, + ) + return { + "action_type": "reply", + "success": True, + "reply_text": reply_text, + "loop_info": loop_info, + } + + # 其他动作 + else: + # 执行普通动作 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_planner_info.action_type, + action_planner_info.reasoning or "", + action_planner_info.action_data or {}, + cycle_timers, + thinking_id, + action_planner_info.action_message, + ) + return { + "action_type": action_planner_info.action_type, + "success": success, + "reply_text": reply_text, + "command": command, + } + + except Exception as e: + logger.error(f"{self.log_prefix} 执行动作时出错: {e}") + logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") + return { + "action_type": action_planner_info.action_type, + "success": False, + "reply_text": "", + "loop_info": None, + "error": str(e), + } diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index 9aab694b..287a07ba 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -50,16 +50,6 @@ ERROR_LOOP_INFO = { }, } -# ?什么时候发言: -# 1.聊天频率较低:与过去该时段发言频率进行比较 -# 2.感兴趣的话题:暂时使用Emb计算 -# 3.感兴趣的人:认识次数 -# 4.直接提及 - -# 什么时候不发言: -# 1.敏感话题:判断较难 -# 2.发言频率太高:近时判断,例如发言频率> 1/人数 *2 -# 3.明确被拒绝:planner判断 install(extra_lines=3) @@ -106,7 +96,7 @@ class HeartFChatting: self._cycle_counter = 0 self._current_cycle_detail: CycleDetail = None # type: ignore - self.last_read_time = time.time() - 10 + self.last_read_time = time.time() - 2 self.talk_threshold = global_config.chat.talk_value @@ -172,16 +162,6 @@ class HeartFChatting: + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) - def get_talk_threshold(self): - talk_value = global_config.chat.talk_value - # 处理talk_value:取整数部分和小数部分 - base_value = int(talk_value) - probability = talk_value - base_value - # 根据概率决定是否+1 - think_len = base_value + 1 if random.random() < probability else base_value - self.talk_threshold = think_len - logger.info(f"{self.log_prefix} 思考频率阈值: {self.talk_threshold}") - async def _loopbody(self): # sourcery skip: hoist-if-from-if recent_messages_list = message_api.get_messages_by_time_in_chat( chat_id=self.stream_id, @@ -193,7 +173,7 @@ class HeartFChatting: filter_command=True, ) - if len(recent_messages_list) >= self.talk_threshold: + if len(recent_messages_list) >= 1: # !处理no_reply_until_call逻辑 if self.no_reply_until_call: for message in recent_messages_list: @@ -212,10 +192,21 @@ class HeartFChatting: return True self.last_read_time = time.time() + + # !此处使at或者提及必定回复 + metioned_message = None + for message in recent_messages_list: + if (message.is_mentioned or message.is_at) and global_config.chat.mentioned_bot_reply: + metioned_message = message + + # *控制频率用 + if not metioned_message: + if random.random() > global_config.chat.talk_value: + return True + await self._observe( - recent_messages_list=recent_messages_list, + recent_messages_list=recent_messages_list,force_reply_message=metioned_message ) - self.get_talk_threshold() else: # Normal模式:消息数量不足,等待 await asyncio.sleep(0.2) @@ -274,7 +265,7 @@ class HeartFChatting: async def _observe( self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, + recent_messages_list: Optional[List["DatabaseMessages"]] = None, force_reply_message:"DatabaseMessages" = None ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if if recent_messages_list is None: recent_messages_list = [] @@ -335,25 +326,19 @@ class HeartFChatting: available_actions=available_actions, ) - # !此处使at或者提及必定回复 - metioned_message = None - for message in recent_messages_list: - if (message.is_mentioned or message.is_at) and global_config.chat.mentioned_bot_reply: - metioned_message = message - has_reply = False for action in action_to_use_info: if action.action_type == "reply": has_reply = True break - if not has_reply and metioned_message: + if not has_reply and force_reply_message: action_to_use_info.append( ActionPlannerInfo( action_type="reply", reasoning="有人提到了你,进行回复", action_data={}, - action_message=metioned_message, + action_message=force_reply_message, available_actions=available_actions, ) ) @@ -556,92 +541,94 @@ class HeartFChatting: ): """执行单个动作的通用函数""" try: - if action_planner_info.action_type == "no_reply": - # 直接处理no_action逻辑,不再通过动作系统 - reason = action_planner_info.reasoning or "选择不回复" - logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") + with Timer(f"动作{action_planner_info.action_type}", cycle_timers): + + if action_planner_info.action_type == "no_reply": + # 直接处理no_action逻辑,不再通过动作系统 + reason = action_planner_info.reasoning or "选择不回复" + # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - # 存储no_action信息到数据库 - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={"reason": reason}, - action_name="no_action", - ) - return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} - - elif action_planner_info.action_type == "wait_time": - action_planner_info.action_data = action_planner_info.action_data or {} - logger.info(f"{self.log_prefix} 等待{action_planner_info.action_data['time']}秒后回复") - await asyncio.sleep(action_planner_info.action_data["time"]) - return {"action_type": "wait_time", "success": True, "reply_text": "", "command": ""} - - elif action_planner_info.action_type == "no_reply_until_call": - logger.info(f"{self.log_prefix} 保持沉默,直到有人直接叫的名字") - self.no_reply_until_call = True - return {"action_type": "no_reply_until_call", "success": True, "reply_text": "", "command": ""} - - elif action_planner_info.action_type == "reply": - try: - success, llm_response = await generator_api.generate_reply( + # 存储no_action信息到数据库 + await database_api.store_action_info( chat_stream=self.chat_stream, - reply_message=action_planner_info.action_message, - available_actions=available_actions, - chosen_actions=chosen_action_plan_infos, - reply_reason=action_planner_info.reasoning or "", - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={"reason": reason}, + action_name="no_action", ) + return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} - if not success or not llm_response or not llm_response.reply_set: - if action_planner_info.action_message: - logger.info(f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败") - else: - logger.info("回复生成失败") + elif action_planner_info.action_type == "wait_time": + action_planner_info.action_data = action_planner_info.action_data or {} + logger.info(f"{self.log_prefix} 等待{action_planner_info.action_data['time']}秒后回复") + await asyncio.sleep(action_planner_info.action_data["time"]) + return {"action_type": "wait_time", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "no_reply_until_call": + logger.info(f"{self.log_prefix} 保持沉默,直到有人直接叫的名字") + self.no_reply_until_call = True + return {"action_type": "no_reply_until_call", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "reply": + try: + success, llm_response = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_message=action_planner_info.action_message, + available_actions=available_actions, + chosen_actions=chosen_action_plan_infos, + reply_reason=action_planner_info.reasoning or "", + enable_tool=global_config.tool.enable_tool, + request_type="replyer", + from_plugin=False, + ) + + if not success or not llm_response or not llm_response.reply_set: + if action_planner_info.action_message: + logger.info(f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败") + else: + logger.info("回复生成失败") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + + except asyncio.CancelledError: + logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - - except asyncio.CancelledError: - logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=action_planner_info.action_message, # type: ignore - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=chosen_action_plan_infos, - selected_expressions=selected_expressions, - ) - return { - "action_type": "reply", - "success": True, - "reply_text": reply_text, - "loop_info": loop_info, - } - - # 其他动作 - else: - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_planner_info.action_type, - action_planner_info.reasoning or "", - action_planner_info.action_data or {}, - cycle_timers, - thinking_id, - action_planner_info.action_message, + response_set = llm_response.reply_set + selected_expressions = llm_response.selected_expressions + loop_info, reply_text, _ = await self._send_and_store_reply( + response_set=response_set, + action_message=action_planner_info.action_message, # type: ignore + cycle_timers=cycle_timers, + thinking_id=thinking_id, + actions=chosen_action_plan_infos, + selected_expressions=selected_expressions, ) - return { - "action_type": action_planner_info.action_type, - "success": success, - "reply_text": reply_text, - "command": command, - } + return { + "action_type": "reply", + "success": True, + "reply_text": reply_text, + "loop_info": loop_info, + } + + # 其他动作 + else: + # 执行普通动作 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_planner_info.action_type, + action_planner_info.reasoning or "", + action_planner_info.action_data or {}, + cycle_timers, + thinking_id, + action_planner_info.action_message, + ) + return { + "action_type": action_planner_info.action_type, + "success": success, + "reply_text": reply_text, + "command": command, + } except Exception as e: logger.error(f"{self.log_prefix} 执行动作时出错: {e}") diff --git a/src/chat/heart_flow/heartflow.py b/src/chat/heart_flow/heartflow.py index 7354b9ac..6f2c8459 100644 --- a/src/chat/heart_flow/heartflow.py +++ b/src/chat/heart_flow/heartflow.py @@ -1,8 +1,11 @@ import traceback from typing import Any, Optional, Dict +from src.chat.message_receive.chat_stream import get_chat_manager from src.common.logger import get_logger from src.chat.heart_flow.heartFC_chat import HeartFChatting +from src.chat.heart_flow.brain_chat import BrainChatting +from src.chat.message_receive.chat_stream import ChatStream logger = get_logger("heartflow") @@ -11,19 +14,26 @@ class Heartflow: """主心流协调器,负责初始化并协调聊天""" def __init__(self): - self.heartflow_chat_list: Dict[Any, HeartFChatting] = {} + self.heartflow_chat_list: Dict[Any, HeartFChatting | BrainChatting] = {} - async def get_or_create_heartflow_chat(self, chat_id: Any) -> Optional[HeartFChatting]: + async def get_or_create_heartflow_chat(self, chat_id: Any) -> Optional[HeartFChatting | BrainChatting]: """获取或创建一个新的HeartFChatting实例""" try: if chat_id in self.heartflow_chat_list: if chat := self.heartflow_chat_list.get(chat_id): return chat else: - new_chat = HeartFChatting(chat_id=chat_id) - await new_chat.start() - self.heartflow_chat_list[chat_id] = new_chat - return new_chat + chat_stream: ChatStream = get_chat_manager().get_stream(chat_id) + if chat_stream.group_info: + new_chat = HeartFChatting(chat_id=chat_id) + await new_chat.start() + self.heartflow_chat_list[chat_id] = new_chat + return new_chat + else: + new_chat = BrainChatting(chat_id=chat_id) + await new_chat.start() + self.heartflow_chat_list[chat_id] = new_chat + return new_chat except Exception as e: logger.error(f"创建心流聊天 {chat_id} 失败: {e}", exc_info=True) traceback.print_exc() diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 6940da7b..e7e20735 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -177,7 +177,7 @@ class ActionPlanner: target_message = message_id_list[-1][1] else: target_message = message_id_list[-1][1] - logger.info(f"{self.log_prefix}动作'{action}'缺少target_message_id,使用最新消息作为target_message") + logger.debug(f"{self.log_prefix}动作'{action}'缺少target_message_id,使用最新消息作为target_message") # 验证action是否可用 available_action_names = [action_name for action_name, _ in current_available_actions] @@ -263,7 +263,7 @@ class ActionPlanner: # 应用激活类型过滤 filtered_actions = self._filter_actions_by_activation_type(available_actions, chat_content_block_short) - logger.info(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作") + logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作") # 构建包含所有动作的提示词 prompt, message_id_list = await self.build_planner_prompt( @@ -483,7 +483,7 @@ class ActionPlanner: if llm_content: try: if json_objects := self._extract_json_from_markdown(llm_content): - logger.info(f"{self.log_prefix}从响应中提取到{len(json_objects)}个JSON对象") + logger.debug(f"{self.log_prefix}从响应中提取到{len(json_objects)}个JSON对象") filtered_actions_list = list(filtered_actions.items()) for json_obj in json_objects: actions.extend(self._parse_single_action(json_obj, message_id_list, filtered_actions_list)) diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 832767d6..28cea041 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -278,7 +278,7 @@ class DefaultReplyer: expression_habits_title = "" if style_habits_str.strip(): expression_habits_title = ( - "在回复时,你可以参考以下的语言习惯,当情景合适就使用,但不要生硬使用,以合理的方式结合到你的回复中:" + "在回复时,你可以参考以下的语言习惯,不要生硬使用:" ) expression_habits_block += f"{style_habits_str}\n" diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 4566ec9f..7f1a7127 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -456,9 +456,6 @@ class ExperimentalConfig(ConfigBase): enable_friend_chat: bool = False """是否启用好友聊天""" - pfc_chatting: bool = False - """是否启用PFC""" - @dataclass class MaimMessageConfig(ConfigBase): diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 36471bdb..033c86b6 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -181,4 +181,4 @@ key_file = "" # SSL密钥文件路径,仅在use_wss=true时有效 enable = true [experimental] #实验性功能 -enable_friend_chat = false # 是否启用好友聊天 \ No newline at end of file +none = false # 暂无 \ No newline at end of file