MaiBot/src/plugins/PFC/conversation_loop.py

457 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import asyncio
import datetime
import traceback
from typing import Dict, Any, List, TYPE_CHECKING
from dateutil import tz
from src.common.logger_manager import get_logger
from src.config.config import global_config
from .pfc_types import ConversationState # 需要导入 ConversationState
from . import actions # 需要导入 actions 模块
if TYPE_CHECKING:
from .conversation import Conversation
logger = get_logger("pfc_loop")
# 时区配置
configured_tz = getattr(global_config, "TIME_ZONE", "Asia/Shanghai")
TIME_ZONE = tz.gettz(configured_tz)
if TIME_ZONE is None:
logger.error(f"配置的时区 '{configured_tz}' 无效,将使用默认时区 'Asia/Shanghai'")
TIME_ZONE = tz.gettz("Asia/Shanghai")
MAX_CONSECUTIVE_LLM_ACTION_FAILURES = 3 # 可配置的最大LLM连续失败次数
async def run_conversation_loop(conversation_instance: "Conversation"):
"""
核心的规划与行动循环 (PFC Loop)。
"""
logger.debug(f"[私聊][{conversation_instance.private_name}] 进入 run_conversation_loop 循环。")
if not conversation_instance._initialized:
logger.error(f"[私聊][{conversation_instance.private_name}] 尝试在未初始化状态下运行规划循环,退出。")
return
_force_reflect_and_act_next_iter = False
while conversation_instance.should_continue:
loop_iter_start_time = time.time()
current_force_reflect_and_act = _force_reflect_and_act_next_iter
_force_reflect_and_act_next_iter = False
logger.debug(
f"[私聊][{conversation_instance.private_name}] 开始新一轮循环迭代 ({loop_iter_start_time:.2f}), force_reflect_next_iter: {current_force_reflect_and_act}, consecutive_llm_failures: {conversation_instance.consecutive_llm_action_failures}"
)
try:
global TIME_ZONE
if TIME_ZONE is None:
configured_tz_loop = getattr(global_config, "TIME_ZONE", "Asia/Shanghai")
TIME_ZONE = tz.gettz(configured_tz_loop)
if TIME_ZONE is None:
logger.error(f"循环中: 配置的时区 '{configured_tz_loop}' 无效,将使用 'Asia/Shanghai'")
TIME_ZONE = tz.gettz("Asia/Shanghai")
current_time_dt = datetime.datetime.now(TIME_ZONE)
if conversation_instance.observation_info:
time_str = current_time_dt.strftime("%Y-%m-%d %H:%M:%S %Z%z")
conversation_instance.observation_info.current_time_str = time_str
else:
logger.warning(
f"[私聊][{conversation_instance.private_name}] ObservationInfo 未初始化,无法更新当前时间。"
)
except Exception as time_update_err:
logger.error(
f"[私聊][{conversation_instance.private_name}] 更新 ObservationInfo 当前时间时出错: {time_update_err}"
)
if (
conversation_instance.ignore_until_timestamp
and loop_iter_start_time < conversation_instance.ignore_until_timestamp
):
if conversation_instance.idle_chat and conversation_instance.idle_chat._running:
logger.debug(f"[私聊][{conversation_instance.private_name}] 对话被暂时忽略,暂停对该用户的主动聊天")
sleep_duration = min(30, conversation_instance.ignore_until_timestamp - loop_iter_start_time)
await asyncio.sleep(sleep_duration)
continue
elif (
conversation_instance.ignore_until_timestamp
and loop_iter_start_time >= conversation_instance.ignore_until_timestamp
):
logger.info(
f"[私聊][{conversation_instance.private_name}] 忽略时间已到 {conversation_instance.stream_id},准备结束对话。"
)
conversation_instance.ignore_until_timestamp = None
await conversation_instance.stop()
continue
else:
pass
try:
if conversation_instance.conversation_info and conversation_instance._initialized:
if (
conversation_instance.conversation_info.person_id
and conversation_instance.relationship_translator
and conversation_instance.person_info_mng
):
try:
numeric_relationship_value = await conversation_instance.person_info_mng.get_value(
conversation_instance.conversation_info.person_id, "relationship_value"
)
if not isinstance(numeric_relationship_value, (int, float)):
from bson.decimal128 import Decimal128
if isinstance(numeric_relationship_value, Decimal128):
numeric_relationship_value = float(numeric_relationship_value.to_decimal())
else:
numeric_relationship_value = 0.0
conversation_instance.conversation_info.relationship_text = (
await conversation_instance.relationship_translator.translate_relationship_value_to_text(
numeric_relationship_value
)
)
except Exception as e_rel:
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) 更新关系文本时出错: {e_rel}")
conversation_instance.conversation_info.relationship_text = "你们的关系是:普通。"
if conversation_instance.mood_mng:
conversation_instance.conversation_info.current_emotion_text = (
conversation_instance.mood_mng.get_mood_prompt()
)
if not all(
[
conversation_instance.action_planner,
conversation_instance.observation_info,
conversation_instance.conversation_info,
]
):
logger.error(
f"[私聊][{conversation_instance.private_name}] 核心组件未初始化无法继续规划循环。将等待5秒后重试..."
)
await asyncio.sleep(5)
continue
planning_start_time = time.time()
logger.debug(
f"[私聊][{conversation_instance.private_name}] --- (Loop) 开始规划 ({planning_start_time:.2f}) ---"
)
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = 0
action, reason = await conversation_instance.action_planner.plan(
conversation_instance.observation_info,
conversation_instance.conversation_info,
conversation_instance.conversation_info.last_successful_reply_action
if conversation_instance.conversation_info
else None,
use_reflect_prompt=current_force_reflect_and_act,
)
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) ActionPlanner.plan 完成,初步规划动作: {action}"
)
current_unprocessed_messages_after_plan = getattr(
conversation_instance.observation_info, "unprocessed_messages", []
)
new_messages_during_action_planning: List[Dict[str, Any]] = []
other_new_messages_during_action_planning: List[Dict[str, Any]] = []
for msg_ap in current_unprocessed_messages_after_plan:
msg_time_ap = msg_ap.get("time")
sender_id_info_ap = msg_ap.get("user_info", {})
sender_id_ap = str(sender_id_info_ap.get("user_id")) if sender_id_info_ap else None
if msg_time_ap and msg_time_ap >= planning_start_time:
new_messages_during_action_planning.append(msg_ap)
if sender_id_ap != conversation_instance.bot_qq_str:
other_new_messages_during_action_planning.append(msg_ap)
new_msg_count_action_planning = len(new_messages_during_action_planning)
other_new_msg_count_action_planning = len(other_new_messages_during_action_planning)
if conversation_instance.conversation_info and other_new_msg_count_action_planning > 0:
pass
should_interrupt_action_planning: bool = False
interrupt_reason_action_planning: str = ""
if action in ["wait", "listening"] and new_msg_count_action_planning > 0:
should_interrupt_action_planning = True
interrupt_reason_action_planning = f"规划 {action} 期间收到 {new_msg_count_action_planning} 条新消息"
elif other_new_msg_count_action_planning > 2:
should_interrupt_action_planning = True
interrupt_reason_action_planning = (
f"规划 {action} 期间收到 {other_new_msg_count_action_planning} 条来自他人的新消息"
)
if should_interrupt_action_planning:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) 中断 '{action}' (在ActionPlanner.plan后),原因: {interrupt_reason_action_planning}。重新规划..."
)
cancel_record_ap = {
"action": action,
"plan_reason": reason,
"status": "cancelled_due_to_new_messages_during_action_plan",
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"final_reason": interrupt_reason_action_planning,
}
if conversation_instance.conversation_info:
if (
not hasattr(conversation_instance.conversation_info, "done_action")
or conversation_instance.conversation_info.done_action is None
):
conversation_instance.conversation_info.done_action = []
conversation_instance.conversation_info.done_action.append(cancel_record_ap)
conversation_instance.conversation_info.last_successful_reply_action = None
conversation_instance.state = ConversationState.ANALYZING
await asyncio.sleep(0.1)
continue
# --- LLM Action Handling with Shield and Failure Count ---
if action in ["direct_reply", "send_new_message"]:
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) 动作 '{action}' 需要LLM生成进入监控执行模式..."
)
llm_call_start_time = time.time()
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = (
other_new_msg_count_action_planning
)
llm_action_task = asyncio.create_task(
actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
)
interrupted_by_new_messages = False
llm_action_completed_successfully = False
action_outcome_processed = False # Flag to ensure we process outcome only once
while not llm_action_task.done() and not action_outcome_processed:
try:
# Shield the task so wait_for timeout doesn't cancel it directly
await asyncio.wait_for(asyncio.shield(llm_action_task), timeout=1.5)
# If wait_for completes without timeout, the shielded task is done (or errored/cancelled by itself)
action_outcome_processed = True # Outcome will be processed outside this loop
except asyncio.TimeoutError:
# Shielded task didn't finish in 1.5s. This is our chance to check messages.
current_time_for_check = time.time()
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM Monitor polling. llm_call_start_time: {llm_call_start_time:.2f}, current_check_time: {current_time_for_check:.2f}. Task still running, checking for new messages."
)
current_unprocessed_messages_during_llm = getattr(
conversation_instance.observation_info, "unprocessed_messages", []
)
other_new_messages_this_check: List[Dict[str, Any]] = []
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) Checking unprocessed_messages (count: {len(current_unprocessed_messages_during_llm)}):"
)
for msg_llm in current_unprocessed_messages_during_llm:
msg_time_llm = msg_llm.get("time")
sender_id_info_llm = msg_llm.get("user_info", {})
sender_id_llm = str(sender_id_info_llm.get("user_id")) if sender_id_info_llm else None
is_new_enough = msg_time_llm and msg_time_llm >= llm_call_start_time
is_other_sender = sender_id_llm != conversation_instance.bot_qq_str
time_str_for_log = f"{msg_time_llm:.2f}" if msg_time_llm is not None else "N/A"
logger.debug(
f" - Msg ID: {msg_llm.get('message_id')}, Time: {time_str_for_log}, Sender: {sender_id_llm}. New enough? {is_new_enough}. Other sender? {is_other_sender}."
)
if is_new_enough and is_other_sender:
other_new_messages_this_check.append(msg_llm)
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) Found {len(other_new_messages_this_check)} 'other_new_messages_this_check'."
)
if len(other_new_messages_this_check) > global_config.pfc_message_buffer_size:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 执行期间收到 {len(other_new_messages_this_check)} 条来自他人的新消息将取消LLM任务。"
)
if not llm_action_task.done(): # Check again before cancelling
llm_action_task.cancel() # Now we explicitly cancel the original task
interrupted_by_new_messages = True
action_outcome_processed = True # We've made a decision, exit monitoring
# else: continue polling if not enough new messages
# Shield ensures CancelledError from llm_action_task itself is caught by the outer try/except
# After the monitoring loop (either task finished, or we decided to cancel it)
# Await the task properly to get its result or handle its exception/cancellation
action_final_status_in_history = "unknown"
try:
await llm_action_task # This will re-raise CancelledError if we cancelled it, or other exceptions
# If no exception, it means the task completed.
# actions.handle_action updates done_action, so we check its status.
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
# Check if done_action is not empty
if conversation_instance.conversation_info.done_action:
action_final_status_in_history = conversation_instance.conversation_info.done_action[
-1
].get("status", "unknown")
if action_final_status_in_history in ["done", "done_no_reply"]:
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终成功完成 (status: {action_final_status_in_history})。"
)
conversation_instance.consecutive_llm_action_failures = 0
llm_action_completed_successfully = True
else:
logger.warning(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务完成但未成功 (status: {action_final_status_in_history})。"
)
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
except asyncio.CancelledError:
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务最终确认被取消。"
)
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
logger.warning(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因外部原因取消连续失败次数: {conversation_instance.consecutive_llm_action_failures}"
)
else: # interrupted_by_new_messages is True
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM任务因新消息被内部逻辑取消不计为LLM失败。"
)
except Exception as e_llm_final:
logger.error(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作 '{action}' 任务执行时发生最终错误: {e_llm_final}"
)
logger.error(traceback.format_exc())
conversation_instance.state = ConversationState.ERROR
if not interrupted_by_new_messages:
conversation_instance.consecutive_llm_action_failures += 1
# --- Post LLM Action Task Handling ---
if not llm_action_completed_successfully:
if conversation_instance.consecutive_llm_action_failures >= MAX_CONSECUTIVE_LLM_ACTION_FAILURES:
logger.error(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM相关动作连续失败或被取消 {conversation_instance.consecutive_llm_action_failures} 次。将强制等待并重置计数器。"
)
action = "wait" # Force action to wait
reason = f"LLM连续失败{conversation_instance.consecutive_llm_action_failures}次,强制等待"
conversation_instance.consecutive_llm_action_failures = 0
if conversation_instance.conversation_info:
conversation_instance.conversation_info.last_successful_reply_action = None
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 执行强制等待动作...")
await actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
_force_reflect_and_act_next_iter = False
conversation_instance.state = ConversationState.ANALYZING
await asyncio.sleep(1)
continue
else:
conversation_instance.state = ConversationState.ANALYZING
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) LLM动作中断/失败准备重新规划。Interrupted by new msgs: {interrupted_by_new_messages}, Consecutive LLM Failures: {conversation_instance.consecutive_llm_action_failures}"
)
await asyncio.sleep(0.1)
continue
else:
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 执行非LLM类动作 '{action}'...")
conversation_instance.consecutive_llm_action_failures = 0
logger.debug(
f"[私聊][{conversation_instance.private_name}] (Loop) 重置 consecutive_llm_action_failures due to non-LLM action."
)
if conversation_instance.conversation_info:
conversation_instance.conversation_info.other_new_messages_during_planning_count = (
other_new_msg_count_action_planning
)
await actions.handle_action(
conversation_instance,
action,
reason,
conversation_instance.observation_info,
conversation_instance.conversation_info,
)
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 非LLM类动作 '{action}' 完成。")
last_action_record = {}
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
if conversation_instance.conversation_info.done_action:
last_action_record = conversation_instance.conversation_info.done_action[-1]
if (
last_action_record.get("action") == "send_new_message"
and last_action_record.get("status") == "done_no_reply"
):
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) 检测到 ReplyGenerator 决定不发送消息,下一轮将强制反思。"
)
_force_reflect_and_act_next_iter = True
goal_ended: bool = False
if (
conversation_instance.conversation_info
and hasattr(conversation_instance.conversation_info, "goal_list")
and conversation_instance.conversation_info.goal_list
):
last_goal_item = conversation_instance.conversation_info.goal_list[-1]
current_goal = (
last_goal_item.get("goal")
if isinstance(last_goal_item, dict)
else (last_goal_item if isinstance(last_goal_item, str) else None)
)
if current_goal == "结束对话":
goal_ended = True
last_action_record_for_end_check = {}
if conversation_instance.conversation_info and conversation_instance.conversation_info.done_action:
if conversation_instance.conversation_info.done_action:
last_action_record_for_end_check = conversation_instance.conversation_info.done_action[-1]
action_ended: bool = (
last_action_record_for_end_check.get("action") in ["end_conversation", "say_goodbye"]
and last_action_record_for_end_check.get("status") == "done"
)
if goal_ended or action_ended:
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) 检测到结束条件,停止循环。")
await conversation_instance.stop()
continue
except asyncio.CancelledError:
logger.info(f"[私聊][{conversation_instance.private_name}] (Loop) PFC 主循环任务被取消。")
await conversation_instance.stop()
break
except Exception as loop_err:
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) PFC 主循环出错: {loop_err}")
logger.error(f"[私聊][{conversation_instance.private_name}] (Loop) {traceback.format_exc()}")
conversation_instance.state = ConversationState.ERROR
await asyncio.sleep(5)
loop_duration = time.time() - loop_iter_start_time
min_loop_interval = 0.1
logger.debug(f"[私聊][{conversation_instance.private_name}] (Loop) 循环迭代耗时: {loop_duration:.3f} 秒。")
if loop_duration < min_loop_interval:
await asyncio.sleep(min_loop_interval - loop_duration)
logger.info(
f"[私聊][{conversation_instance.private_name}] (Loop) PFC 循环已退出 for stream_id: {conversation_instance.stream_id}"
)