better:优化直接提及时的回复速度

pull/1348/head
SengokuCola 2025-11-09 13:59:51 +08:00
parent 82004567a6
commit fb1b520e68
1 changed files with 192 additions and 74 deletions

View File

@ -32,6 +32,7 @@ from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id, build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat, get_raw_msg_before_timestamp_with_chat,
) )
from src.chat.utils.chat_history_summarizer import ChatHistorySummarizer
if TYPE_CHECKING: if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.database_data_model import DatabaseMessages
@ -110,6 +111,8 @@ class HeartFChatting:
self.question_probability_multiplier = 1 self.question_probability_multiplier = 1
self.questioned = False self.questioned = False
# 聊天内容概括器
self.chat_history_summarizer = ChatHistorySummarizer(chat_id=self.stream_id)
async def start(self): async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。""" """检查是否需要启动主循环,如果未激活则启动。"""
@ -125,6 +128,10 @@ class HeartFChatting:
self._loop_task = asyncio.create_task(self._main_chat_loop()) self._loop_task = asyncio.create_task(self._main_chat_loop())
self._loop_task.add_done_callback(self._handle_loop_completion) self._loop_task.add_done_callback(self._handle_loop_completion)
# 启动聊天内容概括器的后台定期检查循环
await self.chat_history_summarizer.start()
logger.info(f"{self.log_prefix} HeartFChatting 启动完成") logger.info(f"{self.log_prefix} HeartFChatting 启动完成")
except Exception as e: except Exception as e:
@ -195,28 +202,30 @@ class HeartFChatting:
question_probability = question_probability * global_config.chat.get_auto_chat_value(self.stream_id) * self.question_probability_multiplier question_probability = question_probability * global_config.chat.get_auto_chat_value(self.stream_id) * self.question_probability_multiplier
#暂时禁用
# print(f"{self.log_prefix} questioned: {self.questioned},len: {len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id))}") # print(f"{self.log_prefix} questioned: {self.questioned},len: {len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id))}")
if question_probability > 0 and not self.questioned and len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id)) == 0: #长久没有回复,可以试试主动发言,提问概率随着时间增加 # if question_probability > 0 and not self.questioned and len(global_conflict_tracker.get_questions_by_chat_id(self.stream_id)) == 0: #长久没有回复,可以试试主动发言,提问概率随着时间增加
# logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,概率: {question_probability}") # # logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,概率: {question_probability}")
if random.random() < question_probability: # 30%概率主动发言 # if random.random() < question_probability: # 30%概率主动发言
try: # try:
self.questioned = True # self.questioned = True
self.last_active_time = time.time() # self.last_active_time = time.time()
# print(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") # # print(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题")
logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题") # logger.info(f"{self.log_prefix} 长久没有回复,可以试试主动发言,开始生成问题")
cycle_timers, thinking_id = self.start_cycle() # cycle_timers, thinking_id = self.start_cycle()
question_maker = QuestionMaker(self.stream_id) # question_maker = QuestionMaker(self.stream_id)
question, context,conflict_context = await question_maker.make_question() # question, context,conflict_context = await question_maker.make_question()
if question: # if question:
logger.info(f"{self.log_prefix} 问题: {question}") # logger.info(f"{self.log_prefix} 问题: {question}")
await global_conflict_tracker.track_conflict(question, conflict_context, True, self.stream_id) # await global_conflict_tracker.track_conflict(question, conflict_context, True, self.stream_id)
await self._lift_question_reply(question,context,thinking_id) # await self._lift_question_reply(question,context,thinking_id)
else: # else:
logger.info(f"{self.log_prefix} 无问题") # logger.info(f"{self.log_prefix} 无问题")
# self.end_cycle(cycle_timers, thinking_id) # # self.end_cycle(cycle_timers, thinking_id)
except Exception as e: # except Exception as e:
logger.error(f"{self.log_prefix} 主动提问失败: {e}") # logger.error(f"{self.log_prefix} 主动提问失败: {e}")
print(traceback.format_exc()) # print(traceback.format_exc())
if len(recent_messages_list) >= 1: if len(recent_messages_list) >= 1:
@ -318,6 +327,87 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers return loop_info, reply_text, cycle_timers
async def _run_planner_without_reply(
self,
available_actions: Dict[str, ActionInfo],
cycle_timers: Dict[str, float],
) -> List[ActionPlannerInfo]:
"""执行planner但不包含reply动作用于并行执行场景"""
try:
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
# 过滤掉reply动作
return [action for action in action_to_use_info if action.action_type != "reply"]
except Exception as e:
logger.error(f"{self.log_prefix} Planner执行失败: {e}")
traceback.print_exc()
return []
async def _generate_mentioned_reply(
self,
force_reply_message: "DatabaseMessages",
thinking_id: str,
cycle_timers: Dict[str, float],
available_actions: Dict[str, ActionInfo],
) -> Dict[str, Any]:
"""当被提及时,独立生成回复的任务"""
try:
self.questioned = False
reason = "有人提到了你,进行回复"
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={},
action_name="reply",
action_reasoning=reason,
)
with Timer("提及回复生成", cycle_timers):
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=force_reply_message,
available_actions=available_actions,
chosen_actions=[], # 独立回复不依赖planner的动作
reply_reason=reason,
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
reply_time_point=self.last_read_time,
)
if not success or not llm_response or not llm_response.reply_set:
logger.warning(f"{self.log_prefix} 提及回复生成失败")
return {"action_type": "reply", "success": False, "result": "提及回复生成失败", "loop_info": None}
response_set = llm_response.reply_set
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=force_reply_message,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=[], # 独立回复不依赖planner的动作
selected_expressions=selected_expressions,
)
self.last_active_time = time.time()
return {
"action_type": "reply",
"success": True,
"result": f"你回复内容{reply_text}",
"loop_info": loop_info,
}
except Exception as e:
logger.error(f"{self.log_prefix} 提及回复生成异常: {e}")
traceback.print_exc()
return {"action_type": "reply", "success": False, "result": f"提及回复生成异常: {e}", "loop_info": None}
async def _observe( async def _observe(
self, # interest_value: float = 0.0, self, # interest_value: float = 0.0,
recent_messages_list: Optional[List["DatabaseMessages"]] = None, recent_messages_list: Optional[List["DatabaseMessages"]] = None,
@ -332,13 +422,15 @@ class HeartFChatting:
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
asyncio.create_task(self.expression_learner.trigger_learning_for_chat()) asyncio.create_task(self.expression_learner.trigger_learning_for_chat())
asyncio.create_task(global_memory_chest.build_running_content(chat_id=self.stream_id))
asyncio.create_task(frequency_control_manager.get_or_create_frequency_control(self.stream_id).trigger_frequency_adjust()) asyncio.create_task(frequency_control_manager.get_or_create_frequency_control(self.stream_id).trigger_frequency_adjust())
# 添加curious检测任务 - 检测聊天记录中的矛盾、冲突或需要提问的内容 # 添加curious检测任务 - 检测聊天记录中的矛盾、冲突或需要提问的内容
asyncio.create_task(check_and_make_question(self.stream_id)) # asyncio.create_task(check_and_make_question(self.stream_id))
# 添加jargon提取任务 - 提取聊天中的黑话/俚语并入库(内部自行取消息并带冷却) # 添加jargon提取任务 - 提取聊天中的黑话/俚语并入库(内部自行取消息并带冷却)
asyncio.create_task(extract_and_store_jargon(self.stream_id)) asyncio.create_task(extract_and_store_jargon(self.stream_id))
# 添加聊天内容概括任务 - 累积、打包和压缩聊天记录
# 注意后台循环已在start()中启动,这里作为额外触发点,在有思考时立即处理
asyncio.create_task(self.chat_history_summarizer.process())
cycle_timers, thinking_id = self.start_cycle() cycle_timers, thinking_id = self.start_cycle()
@ -352,66 +444,88 @@ class HeartFChatting:
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}") logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 执行planner # 如果被提及让回复生成和planner并行执行
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() if force_reply_message:
logger.info(f"{self.log_prefix} 检测到提及回复生成与planner并行执行")
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id, # 并行执行planner和回复生成
timestamp=time.time(), planner_task = asyncio.create_task(
limit=int(global_config.chat.max_context_size * 0.6), self._run_planner_without_reply(
) available_actions=available_actions,
chat_content_block, message_id_list = build_readable_messages_with_id( cycle_timers=cycle_timers,
messages=message_list_before_now, )
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
)
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
) )
reply_task = asyncio.create_task(
has_reply = False self._generate_mentioned_reply(
for action in action_to_use_info: force_reply_message=force_reply_message,
if action.action_type == "reply": thinking_id=thinking_id,
has_reply = True cycle_timers=cycle_timers,
break
if not has_reply and force_reply_message:
action_to_use_info.append(
ActionPlannerInfo(
action_type="reply",
reasoning="有人提到了你,进行回复",
action_data={},
action_message=force_reply_message,
available_actions=available_actions, available_actions=available_actions,
) )
) )
# 等待两个任务完成
planner_result, reply_result = await asyncio.gather(planner_task, reply_task, return_exceptions=True)
# 处理planner结果
if isinstance(planner_result, BaseException):
logger.error(f"{self.log_prefix} Planner执行异常: {planner_result}")
action_to_use_info = []
else:
action_to_use_info = planner_result
# 处理回复结果
if isinstance(reply_result, BaseException):
logger.error(f"{self.log_prefix} 回复生成异常: {reply_result}")
reply_result = {"action_type": "reply", "success": False, "result": "回复生成异常", "loop_info": None}
else:
# 正常流程只执行planner
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
)
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
reply_result = None
# 过滤掉planner返回的reply动作如果存在
action_to_use_info = [action for action in action_to_use_info if action.action_type != "reply"]
logger.info( logger.info(
f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}" f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}"
) )
# 3. 并行执行所有动作 # 3. 并行执行所有动作不包括replyreply已经独立执行
action_tasks = [ action_tasks = [
asyncio.create_task( asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
@ -421,6 +535,10 @@ class HeartFChatting:
# 并行执行所有任务 # 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True) results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 如果有独立的回复结果,添加到结果列表中
if reply_result:
results = list(results) + [reply_result]
# 处理执行结果 # 处理执行结果
reply_loop_info = None reply_loop_info = None