diff --git a/src/plugin_system/base/base_events_handler.py b/src/plugin_system/base/base_events_handler.py index 5118885f..80c07ac4 100644 --- a/src/plugin_system/base/base_events_handler.py +++ b/src/plugin_system/base/base_events_handler.py @@ -34,9 +34,10 @@ class BaseEventHandler(ABC): raise NotImplementedError("事件处理器必须指定 event_type") @abstractmethod - async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]: + async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, Optional[str]]: """执行事件处理的抽象方法,子类必须实现 - + Args: + message (MaiMessages | None): 事件消息对象,当你注册的事件为ON_START和ON_STOP时message为None Returns: Tuple[bool, bool, Optional[str]]: (是否执行成功, 是否需要继续处理, 可选的返回消息) """ diff --git a/src/plugin_system/core/events_manager.py b/src/plugin_system/core/events_manager.py index a27ab9c3..1c99cb7a 100644 --- a/src/plugin_system/core/events_manager.py +++ b/src/plugin_system/core/events_manager.py @@ -62,50 +62,35 @@ class EventsManager: else: return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage) - return None # ON_START, ON_STOP事件没有消息体 + return None # ON_START, ON_STOP事件没有消息体 - def _execute_handler( - self, - handler: BaseEventHandler, - message: Optional[MaiMessages] - ) -> Coroutine[Any, Any, tuple[bool, bool, Any]]: - """封装了调用 handler.execute 的逻辑。""" - return handler.execute(message) if message else handler.execute() - def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]): """分发一个非阻塞(异步)的事件处理任务。""" try: - # 无论是否有 message,都统一调用 - coro = self._execute_handler(handler, message) - task = asyncio.create_task(coro) - + task = asyncio.create_task(handler.execute(message)) + task_name = f"{handler.plugin_name}-{handler.handler_name}" task.set_name(task_name) task.add_done_callback(self._task_done_callback) - + self._handler_tasks.setdefault(handler.handler_name, []).append(task) except Exception as e: logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True) - async def _dispatch_intercepting_handler( - self, - handler: BaseEventHandler, - message: Optional[MaiMessages] - ) -> bool: + async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool: """分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。""" try: - # 统一调用 - success, continue_processing, result = await self._execute_handler(handler, message) - + success, continue_processing, result = await handler.execute(message) + if not success: logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") else: logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") - + return continue_processing except Exception as e: logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True) - return True # 发生异常时默认不中断其他处理 + return True # 发生异常时默认不中断其他处理 async def handle_mai_events( self, @@ -122,7 +107,7 @@ class EventsManager: from src.plugin_system.core import component_registry continue_flag = True - + # 1. 准备消息 transformed_message = self._prepare_message( event_type, message, llm_prompt, llm_response, stream_id, action_usage @@ -137,9 +122,13 @@ class EventsManager: for handler in handlers: # 3. 前置检查和配置加载 - if current_stream_id and handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id): + if ( + current_stream_id + and handler.handler_name + in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id) + ): continue - + # 统一加载插件配置 plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {} handler.set_plugin_config(plugin_config) @@ -152,7 +141,7 @@ class EventsManager: else: # 异步执行,不阻塞 self._dispatch_handler_task(handler, transformed_message) - + return continue_flag def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: