handler参数类型注解更改,简化调用

pull/1217/head
UnCLAS-Prommer 2025-08-22 16:59:44 +08:00
parent ba2c047bab
commit 6d82db7a8c
No known key found for this signature in database
2 changed files with 20 additions and 30 deletions

View File

@ -34,9 +34,10 @@ class BaseEventHandler(ABC):
raise NotImplementedError("事件处理器必须指定 event_type") raise NotImplementedError("事件处理器必须指定 event_type")
@abstractmethod @abstractmethod
async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]: async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, Optional[str]]:
"""执行事件处理的抽象方法,子类必须实现 """执行事件处理的抽象方法,子类必须实现
Args:
message (MaiMessages | None): 事件消息对象当你注册的事件为ON_START和ON_STOP时message为None
Returns: Returns:
Tuple[bool, bool, Optional[str]]: (是否执行成功, 是否需要继续处理, 可选的返回消息) Tuple[bool, bool, Optional[str]]: (是否执行成功, 是否需要继续处理, 可选的返回消息)
""" """

View File

@ -62,50 +62,35 @@ class EventsManager:
else: else:
return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage) return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage)
return None # ON_START, ON_STOP事件没有消息体 return None # ON_START, ON_STOP事件没有消息体
def _execute_handler(
self,
handler: BaseEventHandler,
message: Optional[MaiMessages]
) -> Coroutine[Any, Any, tuple[bool, bool, Any]]:
"""封装了调用 handler.execute 的逻辑。"""
return handler.execute(message) if message else handler.execute()
def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]): def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]):
"""分发一个非阻塞(异步)的事件处理任务。""" """分发一个非阻塞(异步)的事件处理任务。"""
try: try:
# 无论是否有 message都统一调用 task = asyncio.create_task(handler.execute(message))
coro = self._execute_handler(handler, message)
task = asyncio.create_task(coro)
task_name = f"{handler.plugin_name}-{handler.handler_name}" task_name = f"{handler.plugin_name}-{handler.handler_name}"
task.set_name(task_name) task.set_name(task_name)
task.add_done_callback(self._task_done_callback) task.add_done_callback(self._task_done_callback)
self._handler_tasks.setdefault(handler.handler_name, []).append(task) self._handler_tasks.setdefault(handler.handler_name, []).append(task)
except Exception as e: except Exception as e:
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True) logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True)
async def _dispatch_intercepting_handler( async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool:
self,
handler: BaseEventHandler,
message: Optional[MaiMessages]
) -> bool:
"""分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。""" """分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。"""
try: try:
# 统一调用 success, continue_processing, result = await handler.execute(message)
success, continue_processing, result = await self._execute_handler(handler, message)
if not success: if not success:
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}")
else: else:
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}")
return continue_processing return continue_processing
except Exception as e: except Exception as e:
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True) logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True)
return True # 发生异常时默认不中断其他处理 return True # 发生异常时默认不中断其他处理
async def handle_mai_events( async def handle_mai_events(
self, self,
@ -122,7 +107,7 @@ class EventsManager:
from src.plugin_system.core import component_registry from src.plugin_system.core import component_registry
continue_flag = True continue_flag = True
# 1. 准备消息 # 1. 准备消息
transformed_message = self._prepare_message( transformed_message = self._prepare_message(
event_type, message, llm_prompt, llm_response, stream_id, action_usage event_type, message, llm_prompt, llm_response, stream_id, action_usage
@ -137,9 +122,13 @@ class EventsManager:
for handler in handlers: for handler in handlers:
# 3. 前置检查和配置加载 # 3. 前置检查和配置加载
if current_stream_id and handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id): if (
current_stream_id
and handler.handler_name
in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id)
):
continue continue
# 统一加载插件配置 # 统一加载插件配置
plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {} plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {}
handler.set_plugin_config(plugin_config) handler.set_plugin_config(plugin_config)
@ -152,7 +141,7 @@ class EventsManager:
else: else:
# 异步执行,不阻塞 # 异步执行,不阻塞
self._dispatch_handler_task(handler, transformed_message) self._dispatch_handler_task(handler, transformed_message)
return continue_flag return continue_flag
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: