🤖 自动格式化代码 [skip ci]

pull/937/head
github-actions[bot] 2025-05-16 04:51:50 +00:00
parent b1fcc6b745
commit 484983094f
4 changed files with 387 additions and 272 deletions

View File

@ -536,7 +536,9 @@ class IdleChat:
try:
segments = Seg(type="seglist", data=[Seg(type="text", data=content)])
logger.debug(f"[私聊][{self.private_name}]准备发送主动聊天消息: {content}")
await self.message_sender.send_message(chat_stream=chat_stream, segments=segments, reply_to_message=None, content=content)
await self.message_sender.send_message(
chat_stream=chat_stream, segments=segments, reply_to_message=None, content=content
)
logger.info(f"[私聊][{self.private_name}]成功主动发起聊天: {content}")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送主动聊天消息失败: {str(e)}")

View File

@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from typing import Type, TYPE_CHECKING
# 从 action_handlers.py 导入具体的处理器类
from .action_handlers import ( # 调整导入路径
from .action_handlers import ( # 调整导入路径
ActionHandler,
DirectReplyHandler,
SendNewMessageHandler,
@ -17,10 +17,12 @@ from .action_handlers import ( # 调整导入路径
)
if TYPE_CHECKING:
from PFC.conversation import Conversation # 调整导入路径
from PFC.conversation import Conversation # 调整导入路径
class AbstractActionFactory(ABC):
"""抽象动作工厂接口。"""
@abstractmethod
def create_action_handler(self, action_type: str, conversation: "Conversation") -> ActionHandler:
"""
@ -35,8 +37,10 @@ class AbstractActionFactory(ABC):
"""
pass
class StandardActionFactory(AbstractActionFactory):
"""标准的动作工厂实现。"""
def create_action_handler(self, action_type: str, conversation: "Conversation") -> ActionHandler:
"""
根据动作类型创建并返回具体的动作处理器实例
@ -53,10 +57,10 @@ class StandardActionFactory(AbstractActionFactory):
"block_and_ignore": BlockAndIgnoreHandler,
"wait": WaitHandler,
}
handler_class = handler_map.get(action_type) # 获取对应的处理器类
handler_class = handler_map.get(action_type) # 获取对应的处理器类
# 如果找到对应的处理器类
if handler_class:
return handler_class(conversation) # 创建并返回处理器实例
return handler_class(conversation) # 创建并返回处理器实例
else:
# 如果未找到,返回处理未知动作的默认处理器
return UnknownActionHandler(conversation)

View File

@ -4,7 +4,7 @@ import asyncio
import traceback
import json
import random
from typing import Optional, Set, TYPE_CHECKING, List, Tuple, Dict # 确保导入 Dict
from typing import Optional, Set, TYPE_CHECKING, List, Tuple, Dict # 确保导入 Dict
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.common.logger_manager import get_logger
@ -27,6 +27,7 @@ class ActionHandler(ABC):
处理动作的抽象基类
每个具体的动作处理器都应继承此类并实现 execute 方法
"""
def __init__(self, conversation: "Conversation"):
"""
初始化动作处理器
@ -44,7 +45,7 @@ class ActionHandler(ABC):
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict
current_action_record: dict,
) -> tuple[bool, str, str]:
"""
执行具体的动作逻辑
@ -89,15 +90,15 @@ class ActionHandler(ABC):
await self.conversation.direct_sender.send_message(
chat_stream=self.conversation.chat_stream,
segments=final_segments,
reply_to_message=None, # 私聊通常不引用回复
content=content_for_log # 用于发送器内部的日志记录
reply_to_message=None, # 私聊通常不引用回复
content=content_for_log, # 用于发送器内部的日志记录
)
# 注意: my_message_count 的增加现在由具体的发送逻辑(文本或表情)处理后决定
return True
except Exception as e:
self.logger.error(f"[私聊][{self.conversation.private_name}] 发送消息时失败: {str(e)}")
self.logger.error(f"[私聊][{self.conversation.private_name}] {traceback.format_exc()}")
self.conversation.state = ConversationState.ERROR # 发送失败则标记错误状态
self.conversation.state = ConversationState.ERROR # 发送失败则标记错误状态
return False
async def _update_bot_message_in_history(
@ -105,7 +106,7 @@ class ActionHandler(ABC):
send_time: float,
message_content: str,
observation_info: ObservationInfo,
message_id_prefix: str = "bot_sent_"
message_id_prefix: str = "bot_sent_",
):
"""
在机器人成功发送消息后将该消息添加到 ObservationInfo 的聊天历史中
@ -122,15 +123,17 @@ class ActionHandler(ABC):
# 构造机器人发送的消息字典
bot_message_dict: Dict[str, any] = {
"message_id": f"{message_id_prefix}{send_time:.3f}", # 使用更精确的时间戳
"message_id": f"{message_id_prefix}{send_time:.3f}", # 使用更精确的时间戳
"time": send_time,
"user_info": {
"user_id": self.conversation.bot_qq_str,
"user_nickname": global_config.BOT_NICKNAME,
"platform": self.conversation.chat_stream.platform if self.conversation.chat_stream else "unknown_platform",
"platform": self.conversation.chat_stream.platform
if self.conversation.chat_stream
else "unknown_platform",
},
"processed_plain_text": message_content, # 历史记录中的纯文本使用传入的 message_content
"detailed_plain_text": message_content, # 详细文本也使用相同内容
"processed_plain_text": message_content, # 历史记录中的纯文本使用传入的 message_content
"detailed_plain_text": message_content, # 详细文本也使用相同内容
}
observation_info.chat_history.append(bot_message_dict)
observation_info.chat_history_count = len(observation_info.chat_history)
@ -145,7 +148,7 @@ class ActionHandler(ABC):
observation_info.chat_history_count = len(observation_info.chat_history)
# 更新用于 Prompt 的历史记录字符串
history_slice_for_str = observation_info.chat_history[-30:] # 例如最近30条
history_slice_for_str = observation_info.chat_history[-30:] # 例如最近30条
try:
observation_info.chat_history_str = await build_readable_messages(
history_slice_for_str,
@ -162,8 +165,8 @@ class ActionHandler(ABC):
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
action_type: str, # 例如 "direct_reply", "send_memes"
event_description_for_emotion: str
action_type: str, # 例如 "direct_reply", "send_memes"
event_description_for_emotion: str,
):
"""
在成功发送一条或多条消息文本/表情处理通用的状态更新
@ -175,7 +178,7 @@ class ActionHandler(ABC):
action_type (str): 执行的动作类型用于决定追问逻辑
event_description_for_emotion (str): 用于情绪更新的事件描述
"""
current_event_time = time.time() # 获取当前时间作为事件发生时间
current_event_time = time.time() # 获取当前时间作为事件发生时间
# 更新 IdleChat 的最后消息时间
if self.conversation.idle_chat:
@ -184,7 +187,7 @@ class ActionHandler(ABC):
# 清理在本次交互完成(即此函数被调用时)之前的、来自他人的未处理消息
current_unprocessed_messages = getattr(observation_info, "unprocessed_messages", [])
message_ids_to_clear: Set[str] = set()
timestamp_before_current_interaction_completion = current_event_time - 0.001 # 确保是严格之前
timestamp_before_current_interaction_completion = current_event_time - 0.001 # 确保是严格之前
for msg in current_unprocessed_messages:
msg_time = msg.get("time")
@ -195,8 +198,8 @@ class ActionHandler(ABC):
if (
msg_id
and msg_time
and sender_id != self.conversation.bot_qq_str # 是对方的消息
and msg_time < timestamp_before_current_interaction_completion # 在本次交互完成前
and sender_id != self.conversation.bot_qq_str # 是对方的消息
and msg_time < timestamp_before_current_interaction_completion # 在本次交互完成前
):
message_ids_to_clear.add(msg_id)
@ -207,9 +210,7 @@ class ActionHandler(ABC):
await observation_info.clear_processed_messages(message_ids_to_clear)
# 更新追问状态 (last_successful_reply_action)
other_new_msg_count_during_planning = getattr(
conversation_info, "other_new_messages_during_planning_count", 0
)
other_new_msg_count_during_planning = getattr(conversation_info, "other_new_messages_during_planning_count", 0)
if action_type in ["direct_reply", "send_new_message", "send_memes"]:
if other_new_msg_count_during_planning > 0 and action_type == "direct_reply":
# 如果是直接回复,且规划期间有新消息,则下次不应追问
@ -222,10 +223,7 @@ class ActionHandler(ABC):
await self._update_relationship_and_emotion(observation_info, conversation_info, event_description_for_emotion)
async def _update_relationship_and_emotion(
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
event_description: str
self, observation_info: ObservationInfo, conversation_info: ConversationInfo, event_description: str
):
"""
辅助方法调用关系更新器和情绪更新器
@ -298,12 +296,13 @@ class BaseTextReplyHandler(ActionHandler):
处理基于文本的回复动作的基类包含生成-检查-重试的循环
适用于 DirectReplyHandler SendNewMessageHandler
"""
async def _generate_and_check_text_reply_loop(
self,
action_type: str, # "direct_reply" or "send_new_message"
action_type: str, # "direct_reply" or "send_new_message"
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
max_attempts: int
max_attempts: int,
) -> Tuple[bool, Optional[str], str, bool, bool]:
"""
管理生成文本回复并检查其适用性的循环
@ -326,10 +325,10 @@ class BaseTextReplyHandler(ActionHandler):
如果 ReplyGenerator 决定发送则为 True否则为 False对于 direct_reply此值恒为 True
"""
reply_attempt_count = 0
is_suitable = False # 标记内容是否通过检查
generated_content_to_send: Optional[str] = None # 最终要发送的文本
final_check_reason = "未开始检查" # 最终检查原因
need_replan = False # 是否需要重新规划
is_suitable = False # 标记内容是否通过检查
generated_content_to_send: Optional[str] = None # 最终要发送的文本
final_check_reason = "未开始检查" # 最终检查原因
need_replan = False # 是否需要重新规划
# direct_reply 总是尝试发送send_new_message 的初始值取决于RG
should_send_reply_for_new_message = True if action_type == "direct_reply" else False
@ -338,7 +337,7 @@ class BaseTextReplyHandler(ActionHandler):
log_prefix = f"[私聊][{self.conversation.private_name}] 尝试生成/检查 '{action_type}' (第 {reply_attempt_count}/{max_attempts} 次)..."
self.logger.info(log_prefix)
self.conversation.state = ConversationState.GENERATING # 设置状态为生成中
self.conversation.state = ConversationState.GENERATING # 设置状态为生成中
if not self.conversation.reply_generator:
raise RuntimeError(f"ReplyGenerator 未为 {self.conversation.private_name} 初始化")
@ -347,56 +346,68 @@ class BaseTextReplyHandler(ActionHandler):
observation_info, conversation_info, action_type=action_type
)
self.logger.debug(f"{log_prefix} ReplyGenerator.generate 返回: '{raw_llm_output}'")
current_content_for_check = raw_llm_output # 当前待检查的内容
current_content_for_check = raw_llm_output # 当前待检查的内容
# 如果是 send_new_message 动作,需要解析 JSON 判断是否发送
if action_type == "send_new_message":
parsed_json = None
try:
parsed_json = json.loads(raw_llm_output)
except json.JSONDecodeError: # JSON 解析失败
except json.JSONDecodeError: # JSON 解析失败
self.logger.error(f"{log_prefix} ReplyGenerator 返回的不是有效的JSON: {raw_llm_output}")
conversation_info.last_reply_rejection_reason = "回复生成器未返回有效JSON"
conversation_info.last_rejected_reply_content = raw_llm_output
should_send_reply_for_new_message = False # 标记不发送
is_suitable = True # 决策已做出(不发送),所以认为是 "suitable" 以跳出循环
should_send_reply_for_new_message = False # 标记不发送
is_suitable = True # 决策已做出(不发送),所以认为是 "suitable" 以跳出循环
final_check_reason = "回复生成器JSON解析失败决定不发送"
generated_content_to_send = None # 明确不发送内容
break # 跳出重试循环
generated_content_to_send = None # 明确不发送内容
break # 跳出重试循环
if parsed_json: # JSON 解析成功
if parsed_json: # JSON 解析成功
send_decision = parsed_json.get("send", "no").lower()
generated_text_from_json = parsed_json.get("txt", "") # 如果不发送txt可能是"no"
generated_text_from_json = parsed_json.get("txt", "") # 如果不发送txt可能是"no"
if send_decision == "yes": # ReplyGenerator 决定发送
if send_decision == "yes": # ReplyGenerator 决定发送
should_send_reply_for_new_message = True
current_content_for_check = generated_text_from_json
self.logger.info(f"{log_prefix} ReplyGenerator 决定发送消息。内容初步为: '{current_content_for_check[:100]}...'")
else: # ReplyGenerator 决定不发送
self.logger.info(
f"{log_prefix} ReplyGenerator 决定发送消息。内容初步为: '{current_content_for_check[:100]}...'"
)
else: # ReplyGenerator 决定不发送
should_send_reply_for_new_message = False
is_suitable = True # 决策已做出(不发送)
is_suitable = True # 决策已做出(不发送)
final_check_reason = "回复生成器决定不发送"
generated_content_to_send = None
self.logger.info(f"{log_prefix} ReplyGenerator 决定不发送消息。")
break # 跳出重试循环
break # 跳出重试循环
# 检查生成的内容是否有效(适用于 direct_reply 或 send_new_message 且决定发送的情况)
if not current_content_for_check or \
current_content_for_check.startswith("抱歉") or \
current_content_for_check.strip() == "" or \
(action_type == "send_new_message" and current_content_for_check == "no" and should_send_reply_for_new_message):
if (
not current_content_for_check
or current_content_for_check.startswith("抱歉")
or current_content_for_check.strip() == ""
or (
action_type == "send_new_message"
and current_content_for_check == "no"
and should_send_reply_for_new_message
)
):
warning_msg = f"{log_prefix} 生成内容无效或为错误提示"
if action_type == "send_new_message" and current_content_for_check == "no" and should_send_reply_for_new_message:
warning_msg += " (ReplyGenerator决定发送但文本为'no')"
if (
action_type == "send_new_message"
and current_content_for_check == "no"
and should_send_reply_for_new_message
):
warning_msg += " (ReplyGenerator决定发送但文本为'no')"
self.logger.warning(warning_msg + ",将进行下一次尝试 (如果适用)。")
final_check_reason = "生成内容无效" # 更新检查原因
final_check_reason = "生成内容无效" # 更新检查原因
conversation_info.last_reply_rejection_reason = final_check_reason
conversation_info.last_rejected_reply_content = current_content_for_check
await asyncio.sleep(0.5) # 暂停后重试
continue # 进入下一次循环
await asyncio.sleep(0.5) # 暂停后重试
continue # 进入下一次循环
# --- 内容检查 ---
self.conversation.state = ConversationState.CHECKING # 设置状态为检查中
self.conversation.state = ConversationState.CHECKING # 设置状态为检查中
if not self.conversation.reply_checker:
raise RuntimeError(f"ReplyChecker 未为 {self.conversation.private_name} 初始化")
@ -414,51 +425,60 @@ class BaseTextReplyHandler(ActionHandler):
if global_config.enable_pfc_reply_checker:
self.logger.debug(f"{log_prefix} 调用 ReplyChecker 检查 (配置已启用)...")
is_suitable_check, reason_check, need_replan_check = await self.conversation.reply_checker.check(
reply=current_content_for_check, goal=current_goal_str,
chat_history=chat_history_for_check, chat_history_text=chat_history_text_for_check,
current_time_str=current_time_value_for_check, retry_count=(reply_attempt_count - 1)
reply=current_content_for_check,
goal=current_goal_str,
chat_history=chat_history_for_check,
chat_history_text=chat_history_text_for_check,
current_time_str=current_time_value_for_check,
retry_count=(reply_attempt_count - 1),
)
self.logger.info(
f"{log_prefix} ReplyChecker 结果: 合适={is_suitable_check}, 原因='{reason_check}', 需重规划={need_replan_check}"
)
else: # ReplyChecker 未启用
else: # ReplyChecker 未启用
is_suitable_check, reason_check, need_replan_check = True, "ReplyChecker 已通过配置关闭", False
self.logger.debug(f"{log_prefix} [配置关闭] ReplyChecker 已跳过,默认回复为合适。")
is_suitable = is_suitable_check # 更新内容是否合适
final_check_reason = reason_check # 更新检查原因
need_replan = need_replan_check # 更新是否需要重规划
is_suitable = is_suitable_check # 更新内容是否合适
final_check_reason = reason_check # 更新检查原因
need_replan = need_replan_check # 更新是否需要重规划
if not is_suitable: # 如果内容不合适
if not is_suitable: # 如果内容不合适
conversation_info.last_reply_rejection_reason = final_check_reason
conversation_info.last_rejected_reply_content = current_content_for_check
if final_check_reason == "机器人尝试发送重复消息" and not need_replan:
self.logger.warning(f"{log_prefix} 回复因自身重复被拒绝。将重试。")
elif not need_replan and reply_attempt_count < max_attempts: # 如果不需要重规划且还有尝试次数
elif not need_replan and reply_attempt_count < max_attempts: # 如果不需要重规划且还有尝试次数
self.logger.warning(f"{log_prefix} 回复不合适: {final_check_reason}。将重试。")
else: # 需要重规划或已达到最大尝试次数
else: # 需要重规划或已达到最大尝试次数
self.logger.warning(f"{log_prefix} 回复不合适且(需要重规划或已达最大次数): {final_check_reason}")
break # 结束循环
await asyncio.sleep(0.5) # 重试前暂停
else: # 内容合适
generated_content_to_send = current_content_for_check # 设置最终要发送的内容
conversation_info.last_reply_rejection_reason = None # 清除上次拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝内容
break # 成功,跳出循环
break # 结束循环
await asyncio.sleep(0.5) # 重试前暂停
else: # 内容合适
generated_content_to_send = current_content_for_check # 设置最终要发送的内容
conversation_info.last_reply_rejection_reason = None # 清除上次拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝内容
break # 成功,跳出循环
# 确保 send_new_message 在 RG 决定不发送时is_suitable 为 Truegenerated_content_to_send 为 None
if action_type == "send_new_message" and not should_send_reply_for_new_message:
is_suitable = True # 决策已完成(不发送)
generated_content_to_send = None # 确认不发送任何内容
is_suitable = True # 决策已完成(不发送)
generated_content_to_send = None # 确认不发送任何内容
return is_suitable, generated_content_to_send, final_check_reason, need_replan, should_send_reply_for_new_message
return (
is_suitable,
generated_content_to_send,
final_check_reason,
need_replan,
should_send_reply_for_new_message,
)
async def _process_and_send_reply_with_optional_emoji(
self,
action_type: str, # "direct_reply" or "send_new_message"
action_type: str, # "direct_reply" or "send_new_message"
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
max_reply_attempts: int
max_reply_attempts: int,
) -> Tuple[bool, bool, List[str], Optional[str], bool, str, bool]:
"""
核心共享方法处理文本生成/检查获取表情并按顺序发送
@ -485,13 +505,18 @@ class BaseTextReplyHandler(ActionHandler):
full_emoji_description_if_sent: Optional[str] = None
# 1. 处理文本部分
is_suitable_text, generated_text_content, text_check_reason, need_replan_text, rg_decided_to_send_text = \
await self._generate_and_check_text_reply_loop(
action_type=action_type,
observation_info=observation_info,
conversation_info=conversation_info,
max_attempts=max_reply_attempts
)
(
is_suitable_text,
generated_text_content,
text_check_reason,
need_replan_text,
rg_decided_to_send_text,
) = await self._generate_and_check_text_reply_loop(
action_type=action_type,
observation_info=observation_info,
conversation_info=conversation_info,
max_attempts=max_reply_attempts,
)
text_to_send: Optional[str] = None
# 对于 send_new_message只有当 RG 决定发送且内容合适时才有文本
@ -503,36 +528,36 @@ class BaseTextReplyHandler(ActionHandler):
if is_suitable_text and generated_text_content:
text_to_send = generated_text_content
rg_decided_not_to_send_text = (action_type == "send_new_message" and not rg_decided_to_send_text)
rg_decided_not_to_send_text = action_type == "send_new_message" and not rg_decided_to_send_text
# 2. 处理表情部分
emoji_prepared_info: Optional[Tuple[Seg, str, str]] = None # (segment, full_description, log_description)
emoji_prepared_info: Optional[Tuple[Seg, str, str]] = None # (segment, full_description, log_description)
emoji_query = conversation_info.current_emoji_query
if emoji_query:
emoji_prepared_info = await self._fetch_and_prepare_emoji_segment(emoji_query)
# 清理查询,无论是否成功获取,避免重复使用
conversation_info.current_emoji_query = None # 重要:在这里清理
conversation_info.current_emoji_query = None # 重要:在这里清理
# 3. 决定发送顺序并发送
send_order: List[str] = []
if text_to_send and emoji_prepared_info: # 文本和表情都有
if text_to_send and emoji_prepared_info: # 文本和表情都有
send_order = ["text", "emoji"] if random.random() < 0.5 else ["emoji", "text"]
elif text_to_send: # 只有文本
elif text_to_send: # 只有文本
send_order = ["text"]
elif emoji_prepared_info: # 只有表情 (可能是 direct_reply 带表情,或 send_new_message 时 RG 不发文本但有表情)
elif emoji_prepared_info: # 只有表情 (可能是 direct_reply 带表情,或 send_new_message 时 RG 不发文本但有表情)
send_order = ["emoji"]
for item_type in send_order:
current_send_time = time.time() # 每次发送前获取精确时间
current_send_time = time.time() # 每次发送前获取精确时间
if item_type == "text" and text_to_send:
self.conversation.generated_reply = text_to_send # 用于日志和历史记录
self.conversation.generated_reply = text_to_send # 用于日志和历史记录
text_segment = Seg(type="text", data=text_to_send)
if await self._send_reply_or_segments([text_segment], text_to_send):
sent_text_successfully = True
await self._update_bot_message_in_history(current_send_time, text_to_send, observation_info)
if self.conversation.conversation_info:
self.conversation.conversation_info.current_instance_message_count +=1
self.conversation.conversation_info.my_message_count += 1 # 文本发送成功,增加计数
self.conversation.conversation_info.current_instance_message_count += 1
self.conversation.conversation_info.my_message_count += 1 # 文本发送成功,增加计数
final_reason_parts.append(f"成功发送文本 ('{text_to_send[:20]}...')")
else:
final_reason_parts.append("发送文本失败")
@ -543,15 +568,17 @@ class BaseTextReplyHandler(ActionHandler):
if await self._send_reply_or_segments([emoji_segment], log_emoji_desc):
sent_emoji_successfully = True
full_emoji_description_if_sent = full_emoji_desc
await self._update_bot_message_in_history(current_send_time, full_emoji_desc, observation_info, "bot_emoji_")
await self._update_bot_message_in_history(
current_send_time, full_emoji_desc, observation_info, "bot_emoji_"
)
if self.conversation.conversation_info:
self.conversation.conversation_info.current_instance_message_count +=1
self.conversation.conversation_info.my_message_count += 1 # 表情发送成功,增加计数
self.conversation.conversation_info.current_instance_message_count += 1
self.conversation.conversation_info.my_message_count += 1 # 表情发送成功,增加计数
final_reason_parts.append(f"成功发送表情 ({full_emoji_desc})")
else:
final_reason_parts.append("发送表情失败")
# 如果表情发送失败,但文本已成功,也应记录
if not text_to_send : # 如果只有表情且表情失败
if not text_to_send: # 如果只有表情且表情失败
break
return (
@ -560,32 +587,35 @@ class BaseTextReplyHandler(ActionHandler):
final_reason_parts,
full_emoji_description_if_sent,
need_replan_text,
text_check_reason if not is_suitable_text else "文本检查通过或未执行", # 返回文本检查失败的原因
rg_decided_not_to_send_text
text_check_reason if not is_suitable_text else "文本检查通过或未执行", # 返回文本检查失败的原因
rg_decided_not_to_send_text,
)
class DirectReplyHandler(BaseTextReplyHandler):
"""处理直接回复动作direct_reply的处理器。"""
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict
current_action_record: dict,
) -> tuple[bool, str, str]:
"""
执行直接回复动作
会尝试生成文本回复并根据 current_emoji_query 发送附带表情
"""
if not observation_info or not conversation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] DirectReplyHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] DirectReplyHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行直接回复"
action_successful = False # 整体动作是否成功
final_status = "recall" # 默认最终状态
final_reason = "直接回复动作未成功执行" # 默认最终原因
action_successful = False # 整体动作是否成功
final_status = "recall" # 默认最终状态
final_reason = "直接回复动作未成功执行" # 默认最终原因
max_reply_attempts: int = getattr(global_config, "pfc_max_reply_attempts", 3)
(
@ -595,12 +625,12 @@ class DirectReplyHandler(BaseTextReplyHandler):
full_emoji_desc,
need_replan_from_text_check,
text_check_failure_reason,
_ # rg_decided_not_to_send_text, direct_reply 不关心这个
_, # rg_decided_not_to_send_text, direct_reply 不关心这个
) = await self._process_and_send_reply_with_optional_emoji(
action_type="direct_reply",
observation_info=observation_info,
conversation_info=conversation_info,
max_reply_attempts=max_reply_attempts
max_reply_attempts=max_reply_attempts,
)
# 根据发送结果决定最终状态
@ -618,15 +648,17 @@ class DirectReplyHandler(BaseTextReplyHandler):
event_desc = " ".join(event_desc_parts) if event_desc_parts else "机器人发送了消息"
await self._update_post_send_states(observation_info, conversation_info, "direct_reply", event_desc)
elif need_replan_from_text_check: # 文本检查要求重规划
elif need_replan_from_text_check: # 文本检查要求重规划
final_status = "recall"
final_reason = f"文本回复检查要求重新规划: {text_check_failure_reason}"
conversation_info.last_successful_reply_action = None # 重置追问状态
else: # 文本和表情都未能发送,或者文本检查失败且不需重规划(已达最大尝试)
conversation_info.last_successful_reply_action = None # 重置追问状态
else: # 文本和表情都未能发送,或者文本检查失败且不需重规划(已达最大尝试)
final_status = "max_checker_attempts_failed" if not need_replan_from_text_check else "recall"
final_reason = f"直接回复失败。文本检查: {text_check_failure_reason}. " + ("; ".join(reason_parts) if reason_parts else "")
final_reason = f"直接回复失败。文本检查: {text_check_failure_reason}. " + (
"; ".join(reason_parts) if reason_parts else ""
)
action_successful = False
conversation_info.last_successful_reply_action = None # 重置追问状态
conversation_info.last_successful_reply_action = None # 重置追问状态
# 清理 my_message_count (如果动作整体不成功,但部分发送了,需要调整)
if not action_successful and conversation_info:
@ -642,13 +674,14 @@ class DirectReplyHandler(BaseTextReplyHandler):
class SendNewMessageHandler(BaseTextReplyHandler):
"""处理发送新消息动作send_new_message的处理器。"""
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict
current_action_record: dict,
) -> tuple[bool, str, str]:
"""
执行发送新消息动作
@ -656,12 +689,14 @@ class SendNewMessageHandler(BaseTextReplyHandler):
同时也可能根据 current_emoji_query 发送附带表情
"""
if not observation_info or not conversation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] SendNewMessageHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] SendNewMessageHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行发送新消息"
action_successful = False # 整体动作是否成功
final_status = "recall" # 默认最终状态
final_reason = "发送新消息动作未成功执行" # 默认最终原因
action_successful = False # 整体动作是否成功
final_status = "recall" # 默认最终状态
final_reason = "发送新消息动作未成功执行" # 默认最终原因
max_reply_attempts: int = getattr(global_config, "pfc_max_reply_attempts", 3)
(
@ -671,31 +706,35 @@ class SendNewMessageHandler(BaseTextReplyHandler):
full_emoji_desc,
need_replan_from_text_check,
text_check_failure_reason,
rg_decided_not_to_send_text # 重要获取RG是否决定不发文本
rg_decided_not_to_send_text, # 重要获取RG是否决定不发文本
) = await self._process_and_send_reply_with_optional_emoji(
action_type="send_new_message",
observation_info=observation_info,
conversation_info=conversation_info,
max_reply_attempts=max_reply_attempts
max_reply_attempts=max_reply_attempts,
)
# 根据发送结果和RG的决策决定最终状态
if rg_decided_not_to_send_text: # ReplyGenerator 明确决定不发送文本
if sent_emoji_successfully: # 但表情成功发送了
if rg_decided_not_to_send_text: # ReplyGenerator 明确决定不发送文本
if sent_emoji_successfully: # 但表情成功发送了
action_successful = True
final_status = "done" # 整体算完成,因为有内容发出
final_status = "done" # 整体算完成,因为有内容发出
final_reason = f"回复生成器决定不发送文本,但成功发送了附带表情 ({full_emoji_desc or '未知表情'})"
# 即使只发了表情也算一次交互可以更新post_send_states
event_desc = f"你发送了表情: '{full_emoji_desc or '未知表情'}' (文本未发送)"
await self._update_post_send_states(observation_info, conversation_info, "send_new_message", event_desc)
else: # RG不发文本表情也没发出去或失败
action_successful = True # 决策本身是成功的(决定不发)
final_status = "done_no_reply" # 标记为完成但无回复
final_reason = text_check_failure_reason if text_check_failure_reason and text_check_failure_reason != "文本检查通过或未执行" else "回复生成器决定不发送消息,且无表情或表情发送失败"
conversation_info.last_successful_reply_action = None # 因为没有文本发出
if self.conversation.conversation_info: # 确保 my_message_count 被重置
self.conversation.conversation_info.my_message_count = 0
elif sent_text_successfully or sent_emoji_successfully: # RG决定发文本或未明确反对且至少有一个发出去了
else: # RG不发文本表情也没发出去或失败
action_successful = True # 决策本身是成功的(决定不发)
final_status = "done_no_reply" # 标记为完成但无回复
final_reason = (
text_check_failure_reason
if text_check_failure_reason and text_check_failure_reason != "文本检查通过或未执行"
else "回复生成器决定不发送消息,且无表情或表情发送失败"
)
conversation_info.last_successful_reply_action = None # 因为没有文本发出
if self.conversation.conversation_info: # 确保 my_message_count 被重置
self.conversation.conversation_info.my_message_count = 0
elif sent_text_successfully or sent_emoji_successfully: # RG决定发文本或未明确反对且至少有一个发出去了
action_successful = True
final_status = "done"
final_reason = "; ".join(reason_parts) if reason_parts else "成功完成操作"
@ -708,13 +747,15 @@ class SendNewMessageHandler(BaseTextReplyHandler):
event_desc = " ".join(event_desc_parts) if event_desc_parts else "机器人发送了消息"
await self._update_post_send_states(observation_info, conversation_info, "send_new_message", event_desc)
elif need_replan_from_text_check: # 文本检查要求重规划
elif need_replan_from_text_check: # 文本检查要求重规划
final_status = "recall"
final_reason = f"文本回复检查要求重新规划: {text_check_failure_reason}"
conversation_info.last_successful_reply_action = None
else: # 文本和表情都未能发送且RG没有明确说不发文本或者文本检查失败且不需重规划
else: # 文本和表情都未能发送且RG没有明确说不发文本或者文本检查失败且不需重规划
final_status = "max_checker_attempts_failed" if not need_replan_from_text_check else "recall"
final_reason = f"发送新消息失败。文本检查: {text_check_failure_reason}. " + ("; ".join(reason_parts) if reason_parts else "")
final_reason = f"发送新消息失败。文本检查: {text_check_failure_reason}. " + (
"; ".join(reason_parts) if reason_parts else ""
)
action_successful = False
conversation_info.last_successful_reply_action = None
@ -727,27 +768,30 @@ class SendNewMessageHandler(BaseTextReplyHandler):
class SayGoodbyeHandler(ActionHandler):
"""处理发送告别语动作say_goodbye的处理器。"""
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict
current_action_record: dict,
) -> tuple[bool, str, str]:
"""
执行发送告别语的动作
会生成告别文本并发送然后标记对话结束
"""
if not observation_info or not conversation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] SayGoodbyeHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] SayGoodbyeHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行告别"
action_successful = False
final_status = "recall" # 默认状态
final_reason = "告别语动作未成功执行" # 默认原因
final_status = "recall" # 默认状态
final_reason = "告别语动作未成功执行" # 默认原因
self.conversation.state = ConversationState.GENERATING # 设置状态为生成中
self.conversation.state = ConversationState.GENERATING # 设置状态为生成中
if not self.conversation.reply_generator:
raise RuntimeError(f"ReplyGenerator 未为 {self.conversation.private_name} 初始化")
@ -759,68 +803,73 @@ class SayGoodbyeHandler(ActionHandler):
f"[私聊][{self.conversation.private_name}] 动作 'say_goodbye': 生成内容: '{generated_content[:100]}...'"
)
if not generated_content or generated_content.startswith("抱歉"): # 如果生成内容无效
if not generated_content or generated_content.startswith("抱歉"): # 如果生成内容无效
self.logger.warning(
f"[私聊][{self.conversation.private_name}] 动作 'say_goodbye': 生成内容为空或为错误提示,取消发送。"
)
final_reason = "生成告别内容无效"
final_status = "done" # 即使不发送,结束对话的决策也算完成
self.conversation.should_continue = False # 标记对话结束
action_successful = True # 动作(决策结束)本身算成功
else: # 如果生成内容有效
final_status = "done" # 即使不发送,结束对话的决策也算完成
self.conversation.should_continue = False # 标记对话结束
action_successful = True # 动作(决策结束)本身算成功
else: # 如果生成内容有效
self.conversation.generated_reply = generated_content
self.conversation.state = ConversationState.SENDING # 设置状态为发送中
self.conversation.state = ConversationState.SENDING # 设置状态为发送中
text_segment = Seg(type="text", data=self.conversation.generated_reply)
send_success = await self._send_reply_or_segments([text_segment], self.conversation.generated_reply)
send_end_time = time.time()
if send_success: # 如果发送成功
if send_success: # 如果发送成功
action_successful = True
final_status = "done"
final_reason = "成功发送告别语"
self.conversation.should_continue = False # 标记对话结束
self.conversation.should_continue = False # 标记对话结束
if self.conversation.conversation_info:
self.conversation.conversation_info.current_instance_message_count +=1
self.conversation.conversation_info.my_message_count += 1 # 告别语也算一次发言
await self._update_bot_message_in_history(send_end_time, self.conversation.generated_reply, observation_info)
self.conversation.conversation_info.current_instance_message_count += 1
self.conversation.conversation_info.my_message_count += 1 # 告别语也算一次发言
await self._update_bot_message_in_history(
send_end_time, self.conversation.generated_reply, observation_info
)
event_desc = f"你发送了告别消息: '{self.conversation.generated_reply[:50]}...'"
# 注意:由于 should_continue 已设为 False后续的 idle chat 更新可能意义不大,但情绪更新仍可进行
await self._update_post_send_states(observation_info, conversation_info, "say_goodbye", event_desc)
else: # 如果发送失败
else: # 如果发送失败
final_status = "recall"
final_reason = "发送告别语失败"
action_successful = False
self.conversation.should_continue = True # 发送失败则不立即结束对话,让其自然流转
self.conversation.should_continue = True # 发送失败则不立即结束对话,让其自然流转
return action_successful, final_status, final_reason
class SendMemesHandler(ActionHandler):
"""处理发送表情包动作send_memes的处理器。"""
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict
current_action_record: dict,
) -> tuple[bool, str, str]:
"""
执行发送表情包的动作
会根据 current_emoji_query 获取并发送表情
"""
if not observation_info or not conversation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] SendMemesHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] SendMemesHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法发送表情包"
action_successful = False
final_status = "recall" # 默认状态
final_status = "recall" # 默认状态
final_reason_prefix = "发送表情包"
final_reason = f"{final_reason_prefix}失败:未知原因" # 默认原因
self.conversation.state = ConversationState.GENERATING # 或 SENDING_MEME
final_reason = f"{final_reason_prefix}失败:未知原因" # 默认原因
self.conversation.state = ConversationState.GENERATING # 或 SENDING_MEME
emoji_query = conversation_info.current_emoji_query
if not emoji_query: # 如果没有表情查询
if not emoji_query: # 如果没有表情查询
final_reason = f"{final_reason_prefix}失败:缺少表情包查询语句"
# 此动作不依赖文本回复的追问状态,所以不修改 last_successful_reply_action
return False, "recall", final_reason
@ -830,25 +879,27 @@ class SendMemesHandler(ActionHandler):
emoji_prepared_info = await self._fetch_and_prepare_emoji_segment(emoji_query)
if emoji_prepared_info: # 如果成功获取并准备了表情
if emoji_prepared_info: # 如果成功获取并准备了表情
emoji_segment, full_emoji_description, log_emoji_description = emoji_prepared_info
send_success = await self._send_reply_or_segments([emoji_segment], log_emoji_description)
send_end_time = time.time()
if send_success: # 如果发送成功
if send_success: # 如果发送成功
action_successful = True
final_status = "done"
final_reason = f"{final_reason_prefix}成功发送 ({full_emoji_description})"
if self.conversation.conversation_info:
self.conversation.conversation_info.current_instance_message_count +=1
self.conversation.conversation_info.my_message_count += 1 # 表情也算一次发言
await self._update_bot_message_in_history(send_end_time, full_emoji_description, observation_info, "bot_meme_")
self.conversation.conversation_info.current_instance_message_count += 1
self.conversation.conversation_info.my_message_count += 1 # 表情也算一次发言
await self._update_bot_message_in_history(
send_end_time, full_emoji_description, observation_info, "bot_meme_"
)
event_desc = f"你发送了一个表情包 ({full_emoji_description})"
await self._update_post_send_states(observation_info, conversation_info, "send_memes", event_desc)
else: # 如果发送失败
else: # 如果发送失败
final_status = "recall"
final_reason = f"{final_reason_prefix}失败:发送时出错"
else: # 如果未能获取或准备表情
else: # 如果未能获取或准备表情
final_reason = f"{final_reason_prefix}失败:未找到或准备表情失败 ({emoji_query})"
# last_successful_reply_action 保持不变
@ -857,81 +908,143 @@ class SendMemesHandler(ActionHandler):
class RethinkGoalHandler(ActionHandler):
"""处理重新思考目标动作rethink_goal的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""执行重新思考对话目标的动作。"""
if not conversation_info or not observation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] RethinkGoalHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] RethinkGoalHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法重新思考目标"
self.conversation.state = ConversationState.RETHINKING # 设置状态为重新思考中
self.conversation.state = ConversationState.RETHINKING # 设置状态为重新思考中
if not self.conversation.goal_analyzer:
raise RuntimeError(f"GoalAnalyzer 未为 {self.conversation.private_name} 初始化")
await self.conversation.goal_analyzer.analyze_goal(conversation_info, observation_info) # 调用目标分析器
await self.conversation.goal_analyzer.analyze_goal(conversation_info, observation_info) # 调用目标分析器
event_desc = "你重新思考了对话目标和方向"
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
return True, "done", "成功重新思考目标"
class ListeningHandler(ActionHandler):
"""处理倾听动作listening的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""执行倾听对方发言的动作。"""
if not conversation_info or not observation_info:
self.logger.error(f"[私聊][{self.conversation.private_name}] ListeningHandler: ObservationInfo 或 ConversationInfo 为空。")
self.logger.error(
f"[私聊][{self.conversation.private_name}] ListeningHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行倾听"
self.conversation.state = ConversationState.LISTENING # 设置状态为倾听中
self.conversation.state = ConversationState.LISTENING # 设置状态为倾听中
if not self.conversation.waiter:
raise RuntimeError(f"Waiter 未为 {self.conversation.private_name} 初始化")
await self.conversation.waiter.wait_listening(conversation_info) # 调用等待器的倾听方法
await self.conversation.waiter.wait_listening(conversation_info) # 调用等待器的倾听方法
event_desc = "你决定耐心倾听对方的发言"
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
# listening 动作完成后,状态会由新消息或超时驱动,最终回到 ANALYZING
return True, "done", "进入倾听状态"
class EndConversationHandler(ActionHandler):
"""处理结束对话动作end_conversation的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""执行结束当前对话的动作。"""
self.logger.info(f"[私聊][{self.conversation.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话...")
self.conversation.should_continue = False # 标记对话不应继续,主循环会因此退出
self.logger.info(
f"[私聊][{self.conversation.private_name}] 动作 'end_conversation': 收到最终结束指令,停止对话..."
)
self.conversation.should_continue = False # 标记对话不应继续,主循环会因此退出
# 注意:最终的关系评估通常在 Conversation.stop() 方法中进行
return True, "done", "对话结束指令已执行"
class BlockAndIgnoreHandler(ActionHandler):
"""处理屏蔽并忽略对话动作block_and_ignore的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""执行屏蔽并忽略当前对话一段时间的动作。"""
if not conversation_info or not observation_info: # 防御性检查
self.logger.error(f"[私聊][{self.conversation.private_name}] BlockAndIgnoreHandler: ObservationInfo 或 ConversationInfo 为空。")
if not conversation_info or not observation_info: # 防御性检查
self.logger.error(
f"[私聊][{self.conversation.private_name}] BlockAndIgnoreHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行屏蔽"
self.logger.info(f"[私聊][{self.conversation.private_name}] 动作 'block_and_ignore': 不想再理你了...")
ignore_duration_seconds = 10 * 60 # 例如忽略10分钟可以配置
self.conversation.ignore_until_timestamp = time.time() + ignore_duration_seconds # 设置忽略截止时间
self.conversation.state = ConversationState.IGNORED # 设置状态为已忽略
ignore_duration_seconds = 10 * 60 # 例如忽略10分钟可以配置
self.conversation.ignore_until_timestamp = time.time() + ignore_duration_seconds # 设置忽略截止时间
self.conversation.state = ConversationState.IGNORED # 设置状态为已忽略
event_desc = "当前对话让你感到不适,你决定暂时不再理会对方"
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
# should_continue 仍为 True但主循环会检查 ignore_until_timestamp
return True, "done", f"已屏蔽并忽略对话 {ignore_duration_seconds // 60} 分钟"
class WaitHandler(ActionHandler):
"""处理等待动作wait的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""执行等待对方回复的动作。"""
if not conversation_info or not observation_info: # 防御性检查
self.logger.error(f"[私聊][{self.conversation.private_name}] WaitHandler: ObservationInfo 或 ConversationInfo 为空。")
if not conversation_info or not observation_info: # 防御性检查
self.logger.error(
f"[私聊][{self.conversation.private_name}] WaitHandler: ObservationInfo 或 ConversationInfo 为空。"
)
return False, "error", "内部信息缺失,无法执行等待"
self.conversation.state = ConversationState.WAITING # 设置状态为等待中
self.conversation.state = ConversationState.WAITING # 设置状态为等待中
if not self.conversation.waiter:
raise RuntimeError(f"Waiter 未为 {self.conversation.private_name} 初始化")
timeout_occurred = await self.conversation.waiter.wait(conversation_info) # 调用等待器的常规等待方法
timeout_occurred = await self.conversation.waiter.wait(conversation_info) # 调用等待器的常规等待方法
event_desc = "你等待对方回复,但对方长时间没有回应" if timeout_occurred else "你选择等待对方的回复"
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
await self._update_relationship_and_emotion(observation_info, conversation_info, event_desc) # 更新关系和情绪
# wait 动作完成后,状态会由新消息或超时驱动,最终回到 ANALYZING
return True, "done", "等待动作完成"
class UnknownActionHandler(ActionHandler):
"""处理未知或无效动作的处理器。"""
async def execute(self, reason: str, observation_info: Optional[ObservationInfo], conversation_info: Optional[ConversationInfo], action_start_time: float, current_action_record: dict) -> tuple[bool, str, str]:
"""处理无法识别的动作类型。"""
action_name = current_action_record.get("action", "未知动作类型") # 从记录中获取动作名
self.logger.warning(f"[私聊][{self.conversation.private_name}] 接收到未知的动作类型: {action_name}")
return False, "recall", f"未知的动作类型: {action_name}" # 标记为需要重新规划
async def execute(
self,
reason: str,
observation_info: Optional[ObservationInfo],
conversation_info: Optional[ConversationInfo],
action_start_time: float,
current_action_record: dict,
) -> tuple[bool, str, str]:
"""处理无法识别的动作类型。"""
action_name = current_action_record.get("action", "未知动作类型") # 从记录中获取动作名
self.logger.warning(f"[私聊][{self.conversation.private_name}] 接收到未知的动作类型: {action_name}")
return False, "recall", f"未知的动作类型: {action_name}" # 标记为需要重新规划

View File

@ -5,17 +5,17 @@ import traceback
from typing import Optional, TYPE_CHECKING
from src.common.logger_manager import get_logger
from .pfc_types import ConversationState # 调整导入路径
from .observation_info import ObservationInfo # 调整导入路径
from .conversation_info import ConversationInfo # 调整导入路径
from .pfc_types import ConversationState # 调整导入路径
from .observation_info import ObservationInfo # 调整导入路径
from .conversation_info import ConversationInfo # 调整导入路径
# 导入工厂类
from .action_factory import StandardActionFactory # 调整导入路径
from .action_factory import StandardActionFactory # 调整导入路径
if TYPE_CHECKING:
from .conversation import Conversation # 调整导入路径
from .conversation import Conversation # 调整导入路径
logger = get_logger("pfc_actions") # 模块级别日志记录器
logger = get_logger("pfc_actions") # 模块级别日志记录器
async def handle_action(
@ -40,41 +40,39 @@ async def handle_action(
# 如果 conversation_info 和 done_action 存在且不为空
if conversation_info and hasattr(conversation_info, "done_action") and conversation_info.done_action:
# 更新最后一个动作记录的状态和原因
if conversation_info.done_action: # 再次检查列表是否不为空
conversation_info.done_action[-1].update(
{"status": "error", "final_reason": "ObservationInfo is None"}
)
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
if conversation_info.done_action: # 再次检查列表是否不为空
conversation_info.done_action[-1].update({"status": "error", "final_reason": "ObservationInfo is None"})
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
return
# 检查 conversation_info 是否为空
if not conversation_info:
logger.error(f"[私聊][{conversation_instance.private_name}] ConversationInfo 为空,无法处理动作 '{action}'")
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
return
logger.info(f"[私聊][{conversation_instance.private_name}] 开始处理动作: {action}, 原因: {reason}")
action_start_time = time.time() # 记录动作开始时间
action_start_time = time.time() # 记录动作开始时间
# 当前动作记录
current_action_record = {
"action": action, # 动作类型
"plan_reason": reason, # 规划原因
"status": "start", # 初始状态为 "start"
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 当前时间
"final_reason": None, # 最终原因,默认为 None
"action": action, # 动作类型
"plan_reason": reason, # 规划原因
"status": "start", # 初始状态为 "start"
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 当前时间
"final_reason": None, # 最终原因,默认为 None
}
# 如果 done_action 不存在或为空,则初始化
if not hasattr(conversation_info, "done_action") or conversation_info.done_action is None:
conversation_info.done_action = []
conversation_info.done_action.append(current_action_record) # 添加当前动作记录
action_index = len(conversation_info.done_action) - 1 # 获取当前动作记录的索引
conversation_info.done_action.append(current_action_record) # 添加当前动作记录
action_index = len(conversation_info.done_action) - 1 # 获取当前动作记录的索引
action_successful: bool = False # 动作是否成功,默认为 False
final_status: str = "recall" # 最终状态,默认为 "recall"
final_reason: str = "动作未成功执行" # 最终原因,默认为 "动作未成功执行"
action_successful: bool = False # 动作是否成功,默认为 False
final_status: str = "recall" # 最终状态,默认为 "recall"
final_reason: str = "动作未成功执行" # 最终原因,默认为 "动作未成功执行"
factory = StandardActionFactory() # 创建标准动作工厂实例
action_handler = factory.create_action_handler(action, conversation_instance) # 创建动作处理器
factory = StandardActionFactory() # 创建标准动作工厂实例
action_handler = factory.create_action_handler(action, conversation_instance) # 创建动作处理器
try:
# 执行动作处理器
@ -86,33 +84,33 @@ async def handle_action(
# 此部分之前位于每个 if/elif 块内部
# 如果动作不是回复类型的动作
if action not in ["direct_reply", "send_new_message", "say_goodbye", "send_memes"]:
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
conversation_info.last_reply_rejection_reason = None # 清除上次回复拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝的回复内容
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
conversation_info.last_reply_rejection_reason = None # 清除上次回复拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝的回复内容
# 如果动作不是发送表情包或发送表情包失败,则清除表情查询
if action != "send_memes" or not action_successful:
if hasattr(conversation_info, "current_emoji_query"):
conversation_info.current_emoji_query = None
except asyncio.CancelledError: # 捕获任务取消错误
except asyncio.CancelledError: # 捕获任务取消错误
logger.warning(f"[私聊][{conversation_instance.private_name}] 处理动作 '{action}' 时被取消。")
final_status = "cancelled" # 设置最终状态为 "cancelled"
final_status = "cancelled" # 设置最终状态为 "cancelled"
final_reason = "动作处理被取消"
# 如果 conversation_info 存在
if conversation_info:
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
raise # 重新抛出异常,由循环处理
except Exception as handle_err: # 捕获其他异常
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
raise # 重新抛出异常,由循环处理
except Exception as handle_err: # 捕获其他异常
logger.error(f"[私聊][{conversation_instance.private_name}] 处理动作 '{action}' 时出错: {handle_err}")
logger.error(f"[私聊][{conversation_instance.private_name}] {traceback.format_exc()}")
final_status = "error" # 设置最终状态为 "error"
final_status = "error" # 设置最终状态为 "error"
final_reason = f"处理动作时出错: {handle_err}"
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
conversation_instance.state = ConversationState.ERROR # 设置对话状态为错误
# 如果 conversation_info 存在
if conversation_info:
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
action_successful = False # 确保动作为不成功
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
action_successful = False # 确保动作为不成功
finally:
# 更新动作历史记录
@ -130,45 +128,43 @@ async def handle_action(
final_reason = f"动作 {action} 成功完成"
# 如果是发送表情包且 current_emoji_query 存在(理想情况下从处理器获取描述)
if action == "send_memes" and conversation_info.current_emoji_query:
pass # 占位符 - 表情描述最好从处理器的执行结果中获取并用于原因
pass # 占位符 - 表情描述最好从处理器的执行结果中获取并用于原因
# 更新动作记录
conversation_info.done_action[action_index].update(
{
"status": final_status, # 最终状态
"time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间
"final_reason": final_reason, # 最终原因
"duration_ms": int((time.time() - action_start_time) * 1000), # 持续时间(毫秒)
"status": final_status, # 最终状态
"time_completed": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 完成时间
"final_reason": final_reason, # 最终原因
"duration_ms": int((time.time() - action_start_time) * 1000), # 持续时间(毫秒)
}
)
else: # 如果无法更新动作历史记录
else: # 如果无法更新动作历史记录
logger.error(
f"[私聊][{conversation_instance.private_name}] 无法更新动作历史记录done_action 无效或索引 {action_index} 超出范围。"
)
# 根据最终状态设置对话状态
if final_status in ["done", "done_no_reply", "recall"]:
conversation_instance.state = ConversationState.ANALYZING # 设置为分析中
conversation_instance.state = ConversationState.ANALYZING # 设置为分析中
elif final_status in ["error", "max_checker_attempts_failed"]:
conversation_instance.state = ConversationState.ERROR # 设置为错误
conversation_instance.state = ConversationState.ERROR # 设置为错误
# 其他状态如 LISTENING, WAITING, IGNORED, ENDED 在各自的处理器内部或由循环设置。
# 此处移至 try 块以确保即使在发生异常之前也运行
# 如果动作不是回复类型的动作
if action not in ["direct_reply", "send_new_message", "say_goodbye", "send_memes"]:
if conversation_info: # 再次检查 conversation_info 是否不为 None
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
conversation_info.last_reply_rejection_reason = None # 清除上次回复拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝的回复内容
if conversation_info: # 再次检查 conversation_info 是否不为 None
conversation_info.last_successful_reply_action = None # 清除上次成功回复动作
conversation_info.last_reply_rejection_reason = None # 清除上次回复拒绝原因
conversation_info.last_rejected_reply_content = None # 清除上次拒绝的回复内容
# 如果动作不是发送表情包或发送表情包失败
if action != "send_memes" or not action_successful:
# 如果 conversation_info 存在且有 current_emoji_query 属性
if conversation_info and hasattr(conversation_info, "current_emoji_query"):
conversation_info.current_emoji_query = None # 清除当前表情查询
conversation_info.current_emoji_query = None # 清除当前表情查询
log_final_reason_msg = final_reason if final_reason else "无明确原因" # 记录的最终原因消息
log_final_reason_msg = final_reason if final_reason else "无明确原因" # 记录的最终原因消息
# 如果最终状态为 "done",动作成功,且是直接回复或发送新消息,并且有生成的回复
if (
final_status == "done"
@ -179,7 +175,7 @@ async def handle_action(
):
log_final_reason_msg += f" (发送内容: '{conversation_instance.generated_reply[:30]}...')"
# elif final_status == "done" and action_successful and action == "send_memes":
# 表情包的日志记录在其处理器内部或通过下面的通用日志处理
# 表情包的日志记录在其处理器内部或通过下面的通用日志处理
logger.info(
f"[私聊][{conversation_instance.private_name}] 动作 '{action}' 处理完成。最终状态: {final_status}, 原因: {log_final_reason_msg}"