diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index f7d730ef..3fc2029d 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -492,7 +492,7 @@ class HeartFChatting: return False, "", "" # 处理动作并获取结果 - result = await action_handler.handle_action() + result = await action_handler.execute() success, action_text = result command = "" diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index 3f06f929..2143e08f 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -156,65 +156,14 @@ class BaseAction(ABC): f"{self.log_prefix} 聊天信息: 类型={'群聊' if self.is_group else '私聊'}, 平台={self.platform}, 目标={self.target_id}" ) - async def wait_for_new_message(self, timeout: int = 1200) -> Tuple[bool, str]: - """等待新消息或超时 - - 在loop_start_time之后等待新消息,如果没有新消息且没有超时,就一直等待。 - 使用message_api检查self.chat_id对应的聊天中是否有新消息。 - - Args: - timeout: 超时时间(秒),默认1200秒 + @abstractmethod + async def execute(self) -> Tuple[bool, str]: + """执行Action的抽象方法,子类必须实现 Returns: - Tuple[bool, str]: (是否收到新消息, 空字符串) + Tuple[bool, str]: (是否执行成功, 回复文本) """ - try: - # 获取循环开始时间,如果没有则使用当前时间 - loop_start_time = self.action_data.get("loop_start_time", time.time()) - logger.info(f"{self.log_prefix} 开始等待新消息... (最长等待: {timeout}秒, 从时间点: {loop_start_time})") - - # 确保有有效的chat_id - if not self.chat_id: - logger.error(f"{self.log_prefix} 等待新消息失败: 没有有效的chat_id") - return False, "没有有效的chat_id" - - wait_start_time = asyncio.get_event_loop().time() - while True: - # 检查关闭标志 - # shutting_down = self.get_action_context("shutting_down", False) - # if shutting_down: - # logger.info(f"{self.log_prefix} 等待新消息时检测到关闭信号,中断等待") - # return False, "" - - # 检查新消息 - current_time = time.time() - new_message_count = message_api.count_new_messages( - chat_id=self.chat_id, start_time=loop_start_time, end_time=current_time - ) - - if new_message_count > 0: - logger.info(f"{self.log_prefix} 检测到{new_message_count}条新消息,聊天ID: {self.chat_id}") - return True, "" - - # 检查超时 - elapsed_time = asyncio.get_event_loop().time() - wait_start_time - if elapsed_time > timeout: - logger.warning(f"{self.log_prefix} 等待新消息超时({timeout}秒),聊天ID: {self.chat_id}") - return False, "" - - # 每30秒记录一次等待状态 - if int(elapsed_time) % 15 == 0 and int(elapsed_time) > 0: - logger.debug(f"{self.log_prefix} 已等待{int(elapsed_time)}秒,继续等待新消息...") - - # 短暂休眠 - await asyncio.sleep(0.5) - - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 等待新消息被中断 (CancelledError)") - return False, "" - except Exception as e: - logger.error(f"{self.log_prefix} 等待新消息时发生错误: {e}") - return False, f"等待新消息失败: {str(e)}" + pass async def send_text( self, @@ -282,6 +231,56 @@ class BaseAction(ABC): image_base64, self.chat_id, set_reply=set_reply, reply_message=reply_message ) + async def send_command( + self, + command_name: str, + args: Optional[dict] = None, + display_message: str = "", + storage_message: bool = True, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + ) -> bool: + """发送命令消息 + + 使用stream API发送命令 + + Args: + command_name: 命令名称 + args: 命令参数 + display_message: 显示消息 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + try: + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + # 构造命令数据 + command_data = {"name": command_name, "args": args or {}} + + success = await send_api.command_to_stream( + command=command_data, + stream_id=self.chat_id, + storage_message=storage_message, + display_message=display_message, + set_reply=set_reply, + reply_message=reply_message, + ) + + if success: + logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") + else: + logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 发送命令时出错: {e}") + return False + async def send_custom( self, message_type: str, @@ -337,55 +336,65 @@ class BaseAction(ABC): action_name=self.action_name, ) - async def send_command( - self, - command_name: str, - args: Optional[dict] = None, - display_message: str = "", - storage_message: bool = True, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: - """发送命令消息 + async def wait_for_new_message(self, timeout: int = 1200) -> Tuple[bool, str]: + """等待新消息或超时 - 使用stream API发送命令 + 在loop_start_time之后等待新消息,如果没有新消息且没有超时,就一直等待。 + 使用message_api检查self.chat_id对应的聊天中是否有新消息。 Args: - command_name: 命令名称 - args: 命令参数 - display_message: 显示消息 - storage_message: 是否存储消息到数据库 + timeout: 超时时间(秒),默认1200秒 Returns: - bool: 是否发送成功 + Tuple[bool, str]: (是否收到新消息, 空字符串) """ try: + # 获取循环开始时间,如果没有则使用当前时间 + loop_start_time = self.action_data.get("loop_start_time", time.time()) + logger.info(f"{self.log_prefix} 开始等待新消息... (最长等待: {timeout}秒, 从时间点: {loop_start_time})") + + # 确保有有效的chat_id if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False + logger.error(f"{self.log_prefix} 等待新消息失败: 没有有效的chat_id") + return False, "没有有效的chat_id" - # 构造命令数据 - command_data = {"name": command_name, "args": args or {}} + wait_start_time = asyncio.get_event_loop().time() + while True: + # 检查关闭标志 + # shutting_down = self.get_action_context("shutting_down", False) + # if shutting_down: + # logger.info(f"{self.log_prefix} 等待新消息时检测到关闭信号,中断等待") + # return False, "" - success = await send_api.command_to_stream( - command=command_data, - stream_id=self.chat_id, - storage_message=storage_message, - display_message=display_message, - set_reply=set_reply, - reply_message=reply_message, - ) + # 检查新消息 + current_time = time.time() + new_message_count = message_api.count_new_messages( + chat_id=self.chat_id, start_time=loop_start_time, end_time=current_time + ) - if success: - logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") - else: - logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + if new_message_count > 0: + logger.info(f"{self.log_prefix} 检测到{new_message_count}条新消息,聊天ID: {self.chat_id}") + return True, "" - return success + # 检查超时 + elapsed_time = asyncio.get_event_loop().time() - wait_start_time + if elapsed_time > timeout: + logger.warning(f"{self.log_prefix} 等待新消息超时({timeout}秒),聊天ID: {self.chat_id}") + return False, "" + # 每30秒记录一次等待状态 + if int(elapsed_time) % 15 == 0 and int(elapsed_time) > 0: + logger.debug(f"{self.log_prefix} 已等待{int(elapsed_time)}秒,继续等待新消息...") + + # 短暂休眠 + await asyncio.sleep(0.5) + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 等待新消息被中断 (CancelledError)") + return False, "" except Exception as e: - logger.error(f"{self.log_prefix} 发送命令时出错: {e}") - return False + logger.error(f"{self.log_prefix} 等待新消息时发生错误: {e}") + return False, f"等待新消息失败: {str(e)}" @classmethod def get_action_info(cls) -> "ActionInfo": @@ -428,26 +437,6 @@ class BaseAction(ABC): associated_types=getattr(cls, "associated_types", []).copy(), ) - @abstractmethod - async def execute(self) -> Tuple[bool, str]: - """执行Action的抽象方法,子类必须实现 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复文本) - """ - pass - - async def handle_action(self) -> Tuple[bool, str]: - """兼容旧系统的handle_action接口,委托给execute方法 - - 为了保持向后兼容性,旧系统的代码可能会调用handle_action方法。 - 此方法将调用委托给新的execute方法。 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复文本) - """ - return await self.execute() - def get_config(self, key: str, default=None): """获取插件配置值,使用嵌套键访问