From fb1b520e6873b8e86ec8b4074495a46bed21fb3a Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 9 Nov 2025 13:59:51 +0800 Subject: [PATCH] =?UTF-8?q?better=EF=BC=9A=E4=BC=98=E5=8C=96=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E6=8F=90=E5=8F=8A=E6=97=B6=E7=9A=84=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/heart_flow/heartFC_chat.py | 266 ++++++++++++++++++++-------- 1 file changed, 192 insertions(+), 74 deletions(-) diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index 99e55122..7f8940d4 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -32,6 +32,7 @@ from src.chat.utils.chat_message_builder import ( build_readable_messages_with_id, get_raw_msg_before_timestamp_with_chat, ) +from src.chat.utils.chat_history_summarizer import ChatHistorySummarizer if TYPE_CHECKING: from src.common.data_models.database_data_model import DatabaseMessages @@ -110,6 +111,8 @@ class HeartFChatting: self.question_probability_multiplier = 1 self.questioned = False + # 聊天内容概括器 + self.chat_history_summarizer = ChatHistorySummarizer(chat_id=self.stream_id) async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -125,6 +128,10 @@ class HeartFChatting: self._loop_task = asyncio.create_task(self._main_chat_loop()) self._loop_task.add_done_callback(self._handle_loop_completion) + + # 启动聊天内容概括器的后台定期检查循环 + await self.chat_history_summarizer.start() + logger.info(f"{self.log_prefix} HeartFChatting 启动完成") except Exception as e: @@ -195,28 +202,30 @@ class HeartFChatting: question_probability = question_probability * global_config.chat.get_auto_chat_value(self.stream_id) * self.question_probability_multiplier + #暂时禁用 + # print(f"{self.log_prefix} questioned: {self.questioned},len: {len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id))}") - if question_probability > 0 and not self.questioned and len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id)) == 0: #长久没有回复,可以试试主动发言,提问概率随着时间增加 - # logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,概率: {question_probability}") - if random.random() < question_probability: # 30%概率主动发言 - try: - self.questioned = True - self.last_active_time = time.time() - # print(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") - logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") - cycle_timers, thinking_id = self.start_cycle() - question_maker = QuestionMaker(self.stream_id) - question, context,conflict_context = await question_maker.make_question() - if question: - logger.info(f"{self.log_prefix} 问题: {question}") - await global_conflict_tracker.track_conflict(question, conflict_context, True, self.stream_id) - await self._lift_question_reply(question,context,thinking_id) - else: - logger.info(f"{self.log_prefix} 无问题") - # self.end_cycle(cycle_timers, thinking_id) - except Exception as e: - logger.error(f"{self.log_prefix} 主动提问失败: {e}") - print(traceback.format_exc()) + # if question_probability > 0 and not self.questioned and len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id)) == 0: #长久没有回复,可以试试主动发言,提问概率随着时间增加 + # # logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,概率: {question_probability}") + # if random.random() < question_probability: # 30%概率主动发言 + # try: + # self.questioned = True + # self.last_active_time = time.time() + # # print(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") + # logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") + # cycle_timers, thinking_id = self.start_cycle() + # question_maker = QuestionMaker(self.stream_id) + # question, context,conflict_context = await question_maker.make_question() + # if question: + # logger.info(f"{self.log_prefix} 问题: {question}") + # await global_conflict_tracker.track_conflict(question, conflict_context, True, self.stream_id) + # await self._lift_question_reply(question,context,thinking_id) + # else: + # logger.info(f"{self.log_prefix} 无问题") + # # self.end_cycle(cycle_timers, thinking_id) + # except Exception as e: + # logger.error(f"{self.log_prefix} 主动提问失败: {e}") + # print(traceback.format_exc()) if len(recent_messages_list) >= 1: @@ -318,6 +327,87 @@ class HeartFChatting: return loop_info, reply_text, cycle_timers + async def _run_planner_without_reply( + self, + available_actions: Dict[str, ActionInfo], + cycle_timers: Dict[str, float], + ) -> List[ActionPlannerInfo]: + """执行planner,但不包含reply动作(用于并行执行场景)""" + try: + with Timer("规划器", cycle_timers): + action_to_use_info = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, + ) + # 过滤掉reply动作 + return [action for action in action_to_use_info if action.action_type != "reply"] + except Exception as e: + logger.error(f"{self.log_prefix} Planner执行失败: {e}") + traceback.print_exc() + return [] + + async def _generate_mentioned_reply( + self, + force_reply_message: "DatabaseMessages", + thinking_id: str, + cycle_timers: Dict[str, float], + available_actions: Dict[str, ActionInfo], + ) -> Dict[str, Any]: + """当被提及时,独立生成回复的任务""" + try: + self.questioned = False + reason = "有人提到了你,进行回复" + + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={}, + action_name="reply", + action_reasoning=reason, + ) + + with Timer("提及回复生成", cycle_timers): + success, llm_response = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_message=force_reply_message, + available_actions=available_actions, + chosen_actions=[], # 独立回复,不依赖planner的动作 + reply_reason=reason, + enable_tool=global_config.tool.enable_tool, + request_type="replyer", + from_plugin=False, + reply_time_point=self.last_read_time, + ) + + if not success or not llm_response or not llm_response.reply_set: + logger.warning(f"{self.log_prefix} 提及回复生成失败") + return {"action_type": "reply", "success": False, "result": "提及回复生成失败", "loop_info": None} + + response_set = llm_response.reply_set + selected_expressions = llm_response.selected_expressions + loop_info, reply_text, _ = await self._send_and_store_reply( + response_set=response_set, + action_message=force_reply_message, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + actions=[], # 独立回复,不依赖planner的动作 + selected_expressions=selected_expressions, + ) + self.last_active_time = time.time() + return { + "action_type": "reply", + "success": True, + "result": f"你回复内容{reply_text}", + "loop_info": loop_info, + } + except Exception as e: + logger.error(f"{self.log_prefix} 提及回复生成异常: {e}") + traceback.print_exc() + return {"action_type": "reply", "success": False, "result": f"提及回复生成异常: {e}", "loop_info": None} + async def _observe( self, # interest_value: float = 0.0, recent_messages_list: Optional[List["DatabaseMessages"]] = None, @@ -332,13 +422,15 @@ class HeartFChatting: async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): asyncio.create_task(self.expression_learner.trigger_learning_for_chat()) - asyncio.create_task(global_memory_chest.build_running_content(chat_id=self.stream_id)) asyncio.create_task(frequency_control_manager.get_or_create_frequency_control(self.stream_id).trigger_frequency_adjust()) # 添加curious检测任务 - 检测聊天记录中的矛盾、冲突或需要提问的内容 - asyncio.create_task(check_and_make_question(self.stream_id)) + # asyncio.create_task(check_and_make_question(self.stream_id)) # 添加jargon提取任务 - 提取聊天中的黑话/俚语并入库(内部自行取消息并带冷却) asyncio.create_task(extract_and_store_jargon(self.stream_id)) + # 添加聊天内容概括任务 - 累积、打包和压缩聊天记录 + # 注意:后台循环已在start()中启动,这里作为额外触发点,在有思考时立即处理 + asyncio.create_task(self.chat_history_summarizer.process()) cycle_timers, thinking_id = self.start_cycle() @@ -352,66 +444,88 @@ class HeartFChatting: except Exception as e: logger.error(f"{self.log_prefix} 动作修改失败: {e}") - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - interest=global_config.personality.interest, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, + # 如果被提及,让回复生成和planner并行执行 + if force_reply_message: + logger.info(f"{self.log_prefix} 检测到提及,回复生成与planner并行执行") + + # 并行执行planner和回复生成 + planner_task = asyncio.create_task( + self._run_planner_without_reply( + available_actions=available_actions, + cycle_timers=cycle_timers, + ) ) - - has_reply = False - for action in action_to_use_info: - if action.action_type == "reply": - has_reply = True - break - - if not has_reply and force_reply_message: - action_to_use_info.append( - ActionPlannerInfo( - action_type="reply", - reasoning="有人提到了你,进行回复", - action_data={}, - action_message=force_reply_message, + reply_task = asyncio.create_task( + self._generate_mentioned_reply( + force_reply_message=force_reply_message, + thinking_id=thinking_id, + cycle_timers=cycle_timers, available_actions=available_actions, ) ) + # 等待两个任务完成 + planner_result, reply_result = await asyncio.gather(planner_task, reply_task, return_exceptions=True) + + # 处理planner结果 + if isinstance(planner_result, BaseException): + logger.error(f"{self.log_prefix} Planner执行异常: {planner_result}") + action_to_use_info = [] + else: + action_to_use_info = planner_result + + # 处理回复结果 + if isinstance(reply_result, BaseException): + logger.error(f"{self.log_prefix} 回复生成异常: {reply_result}") + reply_result = {"action_type": "reply", "success": False, "result": "回复生成异常", "loop_info": None} + else: + # 正常流程:只执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + with Timer("规划器", cycle_timers): + action_to_use_info = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, + ) + reply_result = None + + # 过滤掉planner返回的reply动作(如果存在) + action_to_use_info = [action for action in action_to_use_info if action.action_type != "reply"] + logger.info( f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}" ) - # 3. 并行执行所有动作 + # 3. 并行执行所有动作(不包括reply,reply已经独立执行) action_tasks = [ asyncio.create_task( self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) @@ -421,6 +535,10 @@ class HeartFChatting: # 并行执行所有任务 results = await asyncio.gather(*action_tasks, return_exceptions=True) + + # 如果有独立的回复结果,添加到结果列表中 + if reply_result: + results = list(results) + [reply_result] # 处理执行结果 reply_loop_info = None