diff --git a/docs/plugins/HandlerResult使用指南.md b/docs/plugins/HandlerResult使用指南.md new file mode 100644 index 00000000..363dec84 --- /dev/null +++ b/docs/plugins/HandlerResult使用指南.md @@ -0,0 +1,109 @@ +# HandlerResult 使用指南 + +## 概述 +HandlerResult类和HandlerResultsCollection类已完全实现,事件处理器继续使用Tuple[bool, bool, str]格式返回,但event.activate会自动转换为HandlerResult实例,提供统一的接口和便捷的结果处理方法。 + +## 新特性 + +### 1. HandlerResult类 +```python +from src.plugin_system.base.base_event import HandlerResult + +# 内部自动创建的HandlerResult实例 +# 包含字段: +# - success: bool - 是否执行成功 +# - continue_process: bool - 是否继续处理后续处理器 +# - message: str - 返回消息 +# - handler_name: str - 处理器名称 +``` + +### 2. HandlerResultsCollection类 +事件激活方法现在返回HandlerResultsCollection,提供以下便捷方法: + +```python +from src.plugin_system.base.base_event import HandlerResultsCollection + +# 获取事件执行结果 +results = await event.activate(params) + +# 检查是否所有处理器都允许继续处理 +if results.all_continue_process(): + print("所有处理器都允许继续") + +# 获取失败的处理器 +failed_handlers = results.get_failed_handlers() +for handler in failed.handlers: + print(f"处理器 {handler.handler_name} 失败: {handler.message}") + +# 获取停止处理的处理器 +stopped_handlers = results.get_stopped_handlers() +for handler in stopped_handlers: + print(f"处理器 {handler.handler_name} 阻止继续处理") + +# 获取执行摘要 +summary = results.get_summary() +print(f"总处理器数: {summary['total_handlers']}") +print(f"成功数: {summary['success_count']}") +print(f"失败数: {summary['failure_count']}") +print(f"是否全部继续: {summary['continue_process']}") +``` + +## 使用示例 + +### 1. 创建事件处理器(保持原有格式) +```python +from src.plugin_system.base.base_events_handler import BaseEventHandler +from typing import Tuple, Optional + +class MyHandler(BaseEventHandler): + handler_name = "my_handler" + handler_description = "我的事件处理器" + + async def execute(self, message) -> Tuple[bool, bool, Optional[str]]: + try: + # 处理逻辑 + return True, True, "处理成功" + except Exception as e: + return False, True, str(e) +``` + +### 2. 激活事件并处理结果 +```python +from src.plugin_system.apis.event_api import get_event + +# 获取事件 +event = get_event("on_message") + +# 激活事件并获取结果(自动转换为HandlerResultsCollection) +results = await event.activate({"message": "Hello"}) + +# 使用便捷方法处理结果 +if not results.all_continue_process(): + stopped = results.get_stopped_handlers() + print(f"被阻止的处理器: {[h.handler_name for h in stopped]}") + +# 检查失败情况 +failed = results.get_failed_handlers() +if failed: + print("失败的处理器:") + for handler in failed: + print(f" - {handler.handler_name}: {handler.message}") +``` + +## 保持兼容性 + +事件处理器继续使用原有的Tuple返回格式,无需任何修改: + +```python +# 保持原有格式 +async def execute(self, message) -> Tuple[bool, bool, Optional[str]]: + return True, True, "处理成功" +``` + +## 注意事项 + +- 事件处理器**无需修改**,继续使用Tuple[bool, bool, str]格式 +- 系统会自动将Tuple转换为HandlerResult实例 +- HandlerResultsCollection提供了丰富的查询和统计功能 +- 异常处理已内置到事件激活流程中 +- 完全向后兼容,现有代码无需任何改动 \ No newline at end of file diff --git a/src/plugin_system/__init__.py b/src/plugin_system/__init__.py index a102ecd0..6ce39c96 100644 --- a/src/plugin_system/__init__.py +++ b/src/plugin_system/__init__.py @@ -37,7 +37,8 @@ from .utils import ( from .apis import ( chat_api, - tool_api, + tool_api, + event_api, component_manage_api, config_api, database_api, diff --git a/src/plugin_system/apis/__init__.py b/src/plugin_system/apis/__init__.py index 362c9858..0fc4c2ef 100644 --- a/src/plugin_system/apis/__init__.py +++ b/src/plugin_system/apis/__init__.py @@ -18,6 +18,7 @@ from src.plugin_system.apis import ( plugin_manage_api, send_api, tool_api, + event_api ) from .logging_api import get_logger from .plugin_register_api import register_plugin @@ -38,4 +39,5 @@ __all__ = [ "get_logger", "register_plugin", "tool_api", + "event_api", ] diff --git a/src/plugin_system/apis/event_api.py b/src/plugin_system/apis/event_api.py index c17e1775..20256873 100644 --- a/src/plugin_system/apis/event_api.py +++ b/src/plugin_system/apis/event_api.py @@ -1,10 +1,10 @@ -from typing import Dict -from src.plugin_system.core.component_registry import component_registry +from typing import Dict, Type from src.plugin_system.base.component_types import ( ComponentType, EventInfo, ) from src.plugin_system.base.base_event import BaseEvent +from src.plugin_system.base.base_events_handler import BaseEventHandler # === 事件管理方法 === def register_event(event_name: str) -> None: @@ -14,6 +14,7 @@ def register_event(event_name: str) -> None: Args: event_name (str): 事件名称。 """ + from src.plugin_system.core.component_registry import component_registry event_info = EventInfo(name=event_name, component_type=ComponentType.EVENT) event_class = BaseEvent(event_name) try: @@ -32,8 +33,37 @@ def get_event(event_name: str) -> BaseEvent | None: Returns: BaseEvent: 事件实例,如果事件不存在则返回 None。 """ + from src.plugin_system.core.component_registry import component_registry return component_registry.get_component_class(event_name, ComponentType.EVENT) +def get_event_subcribers(event_name: str) -> Dict[str, "BaseEventHandler"]: + """ + 获取订阅指定事件的所有事件处理器。 + + Args: + event_name (str): 事件名称。 + + Returns: + dict: 包含所有订阅该事件的事件处理器的字典,键为处理器名称,值为 BaseEventHandler 对象。 + """ + event = get_event(event_name) + if event is None: + return {} + return {handler.handler_name: handler for handler in event.subcribers} + +def get_handler(handler_name: str) -> Type["BaseEventHandler"] | None: + """ + 获取指定名称的事件处理器实例。 + + Args: + handler_name (str): 事件处理器名称。 + + Returns: + BaseEventHandler: 事件处理器实例,如果处理器不存在则返回 None。 + """ + from src.plugin_system.core.component_registry import component_registry + return component_registry.get_component_class(handler_name, ComponentType.EVENT_HANDLER) + def get_current_enabled_events() -> Dict[str, BaseEvent]: """ 获取当前所有已启用的事件。 @@ -41,6 +71,7 @@ def get_current_enabled_events() -> Dict[str, BaseEvent]: Returns: dict: 包含所有已启用事件的字典,键为事件名称,值为 BaseEvent 对象。 """ + from src.plugin_system.core.component_registry import component_registry return {name: event for name, event in component_registry._event_registry.items() if event.enabled} def get_current_unenabled_events() -> Dict[str, BaseEvent]: @@ -50,6 +81,7 @@ def get_current_unenabled_events() -> Dict[str, BaseEvent]: Returns: dict: 包含所有已启用事件的字典,键为事件名称,值为 BaseEvent 对象。 """ + from src.plugin_system.core.component_registry import component_registry return {name: event for name, event in component_registry._event_registry.items() if not event.enabled} def get_all_registered_events() -> Dict[str, BaseEvent]: @@ -59,6 +91,7 @@ def get_all_registered_events() -> Dict[str, BaseEvent]: Returns: dict: 包含所有已注册事件的字典,键为事件名称,值为 BaseEvent 对象。 """ + from src.plugin_system.core.component_registry import component_registry return component_registry._event_registry def init_default_events() -> None: diff --git a/src/plugin_system/base/base_event.py b/src/plugin_system/base/base_event.py index 6e52db0c..e85ba8a8 100644 --- a/src/plugin_system/base/base_event.py +++ b/src/plugin_system/base/base_event.py @@ -1,16 +1,108 @@ -from typing import List -from src.plugin_system.base.base_events_handler import BaseEventHandler +from typing import List, Dict, Any, Optional +import asyncio +class HandlerResult: + """事件处理器执行结果 + + 所有事件处理器必须返回此类的实例 + """ + def __init__(self, success: bool, continue_process: bool, message: str = "", handler_name: str = ""): + self.success = success + self.continue_process = continue_process + self.message = message + self.handler_name = handler_name + + def __repr__(self): + return f"HandlerResult(success={self.success}, continue_process={self.continue_process}, message='{self.message}', handler_name='{self.handler_name}')" + +class HandlerResultsCollection: + """HandlerResult集合,提供便捷的查询方法""" + + def __init__(self, results: List[HandlerResult]): + self.results = results + + def all_continue_process(self) -> bool: + """检查是否所有handler的continue_process都为True""" + return all(result.continue_process for result in self.results) + + def get_all_results(self) -> List[HandlerResult]: + """获取所有HandlerResult""" + return self.results + + def get_failed_handlers(self) -> List[HandlerResult]: + """获取执行失败的handler结果""" + return [result for result in self.results if not result.success] + + def get_stopped_handlers(self) -> List[HandlerResult]: + """获取continue_process为False的handler结果""" + return [result for result in self.results if not result.continue_process] + + def get_handler_result(self, handler_name: str) -> Optional[HandlerResult]: + """获取指定handler的结果""" + for result in self.results: + if result.handler_name == handler_name: + return result + return None + + def get_success_count(self) -> int: + """获取成功执行的handler数量""" + return sum(1 for result in self.results if result.success) + + def get_failure_count(self) -> int: + """获取执行失败的handler数量""" + return sum(1 for result in self.results if not result.success) + + def get_summary(self) -> Dict[str, Any]: + """获取执行摘要""" + return { + "total_handlers": len(self.results), + "success_count": self.get_success_count(), + "failure_count": self.get_failure_count(), + "continue_process": self.all_continue_process(), + "failed_handlers": [r.handler_name for r in self.get_failed_handlers()], + "stopped_handlers": [r.handler_name for r in self.get_stopped_handlers()] + } class BaseEvent: def __init__(self, name: str): self.name = name self.enabled = True + + from src.plugin_system.base.base_events_handler import BaseEventHandler self.subcribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表 def __name__(self): return self.name - async def activate(self, params: dict = {}) -> None: + async def activate(self, params: dict = {}) -> HandlerResultsCollection: + """激活事件,执行所有订阅的处理器 + + Args: + params: 传递给处理器的参数 + + Returns: + HandlerResultsCollection: 所有处理器的执行结果集合 + """ + if not self.enabled: + return HandlerResultsCollection([]) + + results = [] for subscriber in self.subcribers: - return await subscriber.execute(params) \ No newline at end of file + try: + result = await subscriber.execute(params) + if not isinstance(result, HandlerResult): + # 兼容旧版本,将元组转换为HandlerResult + success, continue_process, message = result + result = HandlerResult(success, continue_process, message or "", getattr(subscriber, 'handler_name', 'unkown_handler')) + elif not result.handler_name: + # 补充handler_name + result.handler_name = getattr(subscriber, 'handler_name', 'unkown_handler') + results.append(result) + except Exception as e: + # 处理执行异常 + handler_name = getattr(subscriber, 'handler_name', str(subscriber)) + logger = __import__('src.common.logger', fromlist=['get_logger']).get_logger("base_event") + logger.error(f"事件处理器 {handler_name} 执行失败: {e}") + results.append(HandlerResult(False, True, str(e), handler_name)) + + return HandlerResultsCollection(results) \ No newline at end of file diff --git a/src/plugin_system/core/events_manager.py b/src/plugin_system/core/events_manager.py index 02c164dc..c9265166 100644 --- a/src/plugin_system/core/events_manager.py +++ b/src/plugin_system/core/events_manager.py @@ -7,6 +7,7 @@ from src.chat.message_receive.chat_stream import get_chat_manager from src.common.logger import get_logger from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages from src.plugin_system.base.base_events_handler import BaseEventHandler +from src.plugin_system.base.base_event import HandlerResult, HandlerResultsCollection from .global_announcement_manager import global_announcement_manager if TYPE_CHECKING: @@ -83,14 +84,14 @@ class EventsManager: async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool: """分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。""" try: - success, continue_processing, result = await handler.execute(message) + result = await handler.execute(message) - if not success: - logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") + if not result.success: + logger.error(f"EventHandler {handler.handler_name} 执行失败: {result.message}") else: - logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") + logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result.message}") - return continue_processing + return result.continue_process except Exception as e: logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True) return True # 发生异常时默认不中断其他处理 @@ -257,15 +258,15 @@ class EventsManager: additional_data={"response_is_processed": True}, ) - def _task_done_callback(self, task: asyncio.Task[Tuple[bool, bool, str | None]]): + def _task_done_callback(self, task: asyncio.Task[HandlerResult]): """任务完成回调""" task_name = task.get_name() or "Unknown Task" try: - success, _, result = task.result() # 忽略是否继续的标志,因为消息本身未被拦截 - if success: - logger.debug(f"事件处理任务 {task_name} 已成功完成: {result}") + result = task.result() + if result.success: + logger.debug(f"事件处理任务 {task_name} 已成功完成: {result.message}") else: - logger.error(f"事件处理任务 {task_name} 执行失败: {result}") + logger.error(f"事件处理任务 {task_name} 执行失败: {result.message}") except asyncio.CancelledError: pass except Exception as e: