mirror of https://github.com/Mai-with-u/MaiBot.git
尝试修复
parent
41ad926da8
commit
2bc9eece98
|
|
@ -22,6 +22,7 @@ if TIME_ZONE is None:
|
|||
logger.error(f"配置的时区 '{configured_tz}' 无效,将使用默认时区 'Asia/Shanghai'")
|
||||
TIME_ZONE = tz.gettz("Asia/Shanghai")
|
||||
|
||||
MAX_CONSECUTIVE_LLM_ACTION_FAILURES = 3 # 可配置的最大LLM连续失败次数
|
||||
|
||||
async def run_conversation_loop(conversation_instance: "Conversation"):
|
||||
"""
|
||||
|
|
@ -40,7 +41,7 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
current_force_reflect_and_act = _force_reflect_and_act_next_iter
|
||||
_force_reflect_and_act_next_iter = False
|
||||
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f}), force_reflect_next_iter: {current_force_reflect_and_act}")
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f}), force_reflect_next_iter: {current_force_reflect_and_act}, consecutive_llm_failures: {conversation_instance.consecutive_llm_action_failures}")
|
||||
|
||||
try:
|
||||
global TIME_ZONE
|
||||
|
|
@ -197,6 +198,7 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
# --- LLM Action Handling with Shield and Failure Count ---
|
||||
if action in ["direct_reply", "send_new_message"]:
|
||||
logger.debug(
|
||||
f"[私聊][{conversation_instance.private_name}] (Loop) 动作 '{action}' 需要LLM生成,进入监控执行模式..."
|
||||
|
|
@ -216,15 +218,20 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
)
|
||||
)
|
||||
|
||||
interrupted_during_llm = False
|
||||
llm_task_cancelled_by_us = False
|
||||
interrupted_by_new_messages = False
|
||||
llm_action_completed_successfully = False
|
||||
action_outcome_processed = False # Flag to ensure we process outcome only once
|
||||
|
||||
while not llm_action_task.done():
|
||||
while not llm_action_task.done() and not action_outcome_processed:
|
||||
try:
|
||||
await asyncio.wait_for(llm_action_task, timeout=1.5)
|
||||
# Shield the task so wait_for timeout doesn't cancel it directly
|
||||
await asyncio.wait_for(asyncio.shield(llm_action_task), timeout=1.5)
|
||||
# If wait_for completes without timeout, the shielded task is done (or errored/cancelled by itself)
|
||||
action_outcome_processed = True # Outcome will be processed outside this loop
|
||||
except asyncio.TimeoutError:
|
||||
# Shielded task didn't finish in 1.5s. This is our chance to check messages.
|
||||
current_time_for_check = time.time()
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM Monitor Timeout. llm_call_start_time: {llm_call_start_time:.2f}, current_check_time: {current_time_for_check:.2f}")
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM Monitor polling. llm_call_start_time: {llm_call_start_time:.2f}, current_check_time: {current_time_for_check:.2f}. Task still running, checking for new messages.")
|
||||
|
||||
current_unprocessed_messages_during_llm = getattr(conversation_instance.observation_info, "unprocessed_messages", [])
|
||||
other_new_messages_this_check: List[Dict[str, Any]] = []
|
||||
|
|
@ -237,7 +244,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
is_new_enough = msg_time_llm and msg_time_llm >= llm_call_start_time
|
||||
is_other_sender = sender_id_llm != conversation_instance.bot_qq_str
|
||||
|
||||
# *** Fix for ValueError ***
|
||||
time_str_for_log = f"{msg_time_llm:.2f}" if msg_time_llm is not None else "N/A"
|
||||
logger.debug(f" - Msg ID: {msg_llm.get('message_id')}, Time: {time_str_for_log}, Sender: {sender_id_llm}. New enough? {is_new_enough}. Other sender? {is_other_sender}.")
|
||||
|
||||
|
|
@ -248,33 +254,88 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
|
||||
if len(other_new_messages_this_check) > 2:
|
||||
logger.info(
|
||||
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息,发起中断。"
|
||||
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息,将取消LLM任务。"
|
||||
)
|
||||
if not llm_action_task.done():
|
||||
llm_action_task.cancel()
|
||||
llm_task_cancelled_by_us = True
|
||||
break
|
||||
if not llm_action_task.done(): # Check again before cancelling
|
||||
llm_action_task.cancel() # Now we explicitly cancel the original task
|
||||
interrupted_by_new_messages = True
|
||||
action_outcome_processed = True # We've made a decision, exit monitoring
|
||||
# else: continue polling if not enough new messages
|
||||
# Shield ensures CancelledError from llm_action_task itself is caught by the outer try/except
|
||||
|
||||
# After the monitoring loop (either task finished, or we decided to cancel it)
|
||||
# Await the task properly to get its result or handle its exception/cancellation
|
||||
action_final_status_in_history = "unknown"
|
||||
try:
|
||||
await llm_action_task
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终完成 (未被取消或未发生错误)。")
|
||||
await llm_action_task # This will re-raise CancelledError if we cancelled it, or other exceptions
|
||||
|
||||
# If no exception, it means the task completed.
|
||||
# actions.handle_action updates done_action, so we check its status.
|
||||
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
|
||||
# Check if done_action is not empty
|
||||
if conversation_instance.conversation_info.done_action:
|
||||
action_final_status_in_history = conversation_instance.conversation_info.done_action[-1].get("status", "unknown")
|
||||
|
||||
if action_final_status_in_history in ["done", "done_no_reply"]:
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终成功完成 (status: {action_final_status_in_history})。")
|
||||
conversation_instance.consecutive_llm_action_failures = 0
|
||||
llm_action_completed_successfully = True
|
||||
else:
|
||||
logger.warning(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务完成但未成功 (status: {action_final_status_in_history})。")
|
||||
if not interrupted_by_new_messages:
|
||||
conversation_instance.consecutive_llm_action_failures += 1
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终确认被取消。")
|
||||
interrupted_during_llm = True
|
||||
if not interrupted_by_new_messages:
|
||||
conversation_instance.consecutive_llm_action_failures += 1
|
||||
logger.warning(f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因外部原因取消,连续失败次数: {conversation_instance.consecutive_llm_action_failures}")
|
||||
else: # interrupted_by_new_messages is True
|
||||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因新消息被内部逻辑取消,不计为LLM失败。")
|
||||
|
||||
except Exception as e_llm_final:
|
||||
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务执行时发生最终错误: {e_llm_final}")
|
||||
logger.error(traceback.format_exc())
|
||||
interrupted_during_llm = True
|
||||
conversation_instance.state = ConversationState.ERROR
|
||||
|
||||
if interrupted_during_llm:
|
||||
conversation_instance.state = ConversationState.ANALYZING
|
||||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作中断,准备重新规划。llm_task_cancelled_by_us: {llm_task_cancelled_by_us}")
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
else:
|
||||
conversation_instance.state = ConversationState.ERROR
|
||||
if not interrupted_by_new_messages:
|
||||
conversation_instance.consecutive_llm_action_failures += 1
|
||||
|
||||
# --- Post LLM Action Task Handling ---
|
||||
if not llm_action_completed_successfully:
|
||||
if conversation_instance.consecutive_llm_action_failures >= MAX_CONSECUTIVE_LLM_ACTION_FAILURES:
|
||||
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) LLM相关动作连续失败或被取消 {conversation_instance.consecutive_llm_action_failures} 次。将强制等待并重置计数器。")
|
||||
|
||||
action = "wait" # Force action to wait
|
||||
reason = f"LLM连续失败{conversation_instance.consecutive_llm_action_failures}次,强制等待"
|
||||
conversation_instance.consecutive_llm_action_failures = 0
|
||||
|
||||
if conversation_instance.conversation_info:
|
||||
conversation_instance.conversation_info.last_successful_reply_action = None
|
||||
|
||||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 执行强制等待动作...")
|
||||
await actions.handle_action(
|
||||
conversation_instance,
|
||||
action,
|
||||
reason,
|
||||
conversation_instance.observation_info,
|
||||
conversation_instance.conversation_info,
|
||||
)
|
||||
_force_reflect_and_act_next_iter = False
|
||||
conversation_instance.state = ConversationState.ANALYZING
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
else:
|
||||
conversation_instance.state = ConversationState.ANALYZING
|
||||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作中断/失败,准备重新规划。Interrupted by new msgs: {interrupted_by_new_messages}, Consecutive LLM Failures: {conversation_instance.consecutive_llm_action_failures}")
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
else:
|
||||
logger.debug(
|
||||
f"[私聊][{conversation_instance.private_name}] (Loop) 执行非LLM类动作 '{action}'..."
|
||||
)
|
||||
conversation_instance.consecutive_llm_action_failures = 0
|
||||
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 重置 consecutive_llm_action_failures due to non-LLM action.")
|
||||
|
||||
if conversation_instance.conversation_info:
|
||||
conversation_instance.conversation_info.other_new_messages_during_planning_count = other_new_msg_count_action_planning
|
||||
|
||||
|
|
@ -289,7 +350,8 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
|
||||
last_action_record = {}
|
||||
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
|
||||
last_action_record = conversation_instance.conversation_info.done_action[-1]
|
||||
if conversation_instance.conversation_info.done_action:
|
||||
last_action_record = conversation_instance.conversation_info.done_action[-1]
|
||||
|
||||
if (
|
||||
last_action_record.get("action") == "send_new_message"
|
||||
|
|
@ -298,7 +360,6 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到 ReplyGenerator 决定不发送消息,下一轮将强制反思。")
|
||||
_force_reflect_and_act_next_iter = True
|
||||
|
||||
|
||||
goal_ended: bool = False
|
||||
if (
|
||||
conversation_instance.conversation_info
|
||||
|
|
@ -316,7 +377,9 @@ async def run_conversation_loop(conversation_instance: "Conversation"):
|
|||
|
||||
last_action_record_for_end_check = {}
|
||||
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
|
||||
last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1]
|
||||
if conversation_instance.conversation_info.done_action:
|
||||
last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1]
|
||||
|
||||
action_ended: bool = (
|
||||
last_action_record_for_end_check.get("action") in ["end_conversation", "say_goodbye"]
|
||||
and last_action_record_for_end_check.get("status") == "done"
|
||||
|
|
|
|||
Loading…
Reference in New Issue