pull/1217/head
SengokuCola 2025-08-22 17:01:50 +08:00
commit 6a7b29695c
2 changed files with 20 additions and 30 deletions

View File

@ -34,9 +34,10 @@ class BaseEventHandler(ABC):
raise NotImplementedError("事件处理器必须指定 event_type")
@abstractmethod
async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]:
async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, Optional[str]]:
"""执行事件处理的抽象方法,子类必须实现
Args:
message (MaiMessages | None): 事件消息对象当你注册的事件为ON_START和ON_STOP时message为None
Returns:
Tuple[bool, bool, Optional[str]]: (是否执行成功, 是否需要继续处理, 可选的返回消息)
"""

View File

@ -62,50 +62,35 @@ class EventsManager:
else:
return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage)
return None # ON_START, ON_STOP事件没有消息体
return None # ON_START, ON_STOP事件没有消息体
def _execute_handler(
self,
handler: BaseEventHandler,
message: Optional[MaiMessages]
) -> Coroutine[Any, Any, tuple[bool, bool, Any]]:
"""封装了调用 handler.execute 的逻辑。"""
return handler.execute(message) if message else handler.execute()
def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]):
"""分发一个非阻塞(异步)的事件处理任务。"""
try:
# 无论是否有 message都统一调用
coro = self._execute_handler(handler, message)
task = asyncio.create_task(coro)
task = asyncio.create_task(handler.execute(message))
task_name = f"{handler.plugin_name}-{handler.handler_name}"
task.set_name(task_name)
task.add_done_callback(self._task_done_callback)
self._handler_tasks.setdefault(handler.handler_name, []).append(task)
except Exception as e:
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True)
async def _dispatch_intercepting_handler(
self,
handler: BaseEventHandler,
message: Optional[MaiMessages]
) -> bool:
async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool:
"""分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。"""
try:
# 统一调用
success, continue_processing, result = await self._execute_handler(handler, message)
success, continue_processing, result = await handler.execute(message)
if not success:
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}")
else:
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}")
return continue_processing
except Exception as e:
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True)
return True # 发生异常时默认不中断其他处理
return True # 发生异常时默认不中断其他处理
async def handle_mai_events(
self,
@ -122,7 +107,7 @@ class EventsManager:
from src.plugin_system.core import component_registry
continue_flag = True
# 1. 准备消息
transformed_message = self._prepare_message(
event_type, message, llm_prompt, llm_response, stream_id, action_usage
@ -137,9 +122,13 @@ class EventsManager:
for handler in handlers:
# 3. 前置检查和配置加载
if current_stream_id and handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id):
if (
current_stream_id
and handler.handler_name
in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id)
):
continue
# 统一加载插件配置
plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {}
handler.set_plugin_config(plugin_config)
@ -152,7 +141,7 @@ class EventsManager:
else:
# 异步执行,不阻塞
self._dispatch_handler_task(handler, transformed_message)
return continue_flag
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: