继续升级Event系统

pull/1215/head
Windpicker-owo 2025-08-23 15:26:33 +08:00
parent a82c1a2d2d
commit b486ea1529
6 changed files with 255 additions and 17 deletions

View File

@ -0,0 +1,109 @@
# HandlerResult 使用指南
## 概述
HandlerResult类和HandlerResultsCollection类已完全实现事件处理器继续使用Tuple[bool, bool, str]格式返回但event.activate会自动转换为HandlerResult实例提供统一的接口和便捷的结果处理方法。
## 新特性
### 1. HandlerResult类
```python
from src.plugin_system.base.base_event import HandlerResult
# 内部自动创建的HandlerResult实例
# 包含字段:
# - success: bool - 是否执行成功
# - continue_process: bool - 是否继续处理后续处理器
# - message: str - 返回消息
# - handler_name: str - 处理器名称
```
### 2. HandlerResultsCollection类
事件激活方法现在返回HandlerResultsCollection提供以下便捷方法
```python
from src.plugin_system.base.base_event import HandlerResultsCollection
# 获取事件执行结果
results = await event.activate(params)
# 检查是否所有处理器都允许继续处理
if results.all_continue_process():
print("所有处理器都允许继续")
# 获取失败的处理器
failed_handlers = results.get_failed_handlers()
for handler in failed.handlers:
print(f"处理器 {handler.handler_name} 失败: {handler.message}")
# 获取停止处理的处理器
stopped_handlers = results.get_stopped_handlers()
for handler in stopped_handlers:
print(f"处理器 {handler.handler_name} 阻止继续处理")
# 获取执行摘要
summary = results.get_summary()
print(f"总处理器数: {summary['total_handlers']}")
print(f"成功数: {summary['success_count']}")
print(f"失败数: {summary['failure_count']}")
print(f"是否全部继续: {summary['continue_process']}")
```
## 使用示例
### 1. 创建事件处理器(保持原有格式)
```python
from src.plugin_system.base.base_events_handler import BaseEventHandler
from typing import Tuple, Optional
class MyHandler(BaseEventHandler):
handler_name = "my_handler"
handler_description = "我的事件处理器"
async def execute(self, message) -> Tuple[bool, bool, Optional[str]]:
try:
# 处理逻辑
return True, True, "处理成功"
except Exception as e:
return False, True, str(e)
```
### 2. 激活事件并处理结果
```python
from src.plugin_system.apis.event_api import get_event
# 获取事件
event = get_event("on_message")
# 激活事件并获取结果自动转换为HandlerResultsCollection
results = await event.activate({"message": "Hello"})
# 使用便捷方法处理结果
if not results.all_continue_process():
stopped = results.get_stopped_handlers()
print(f"被阻止的处理器: {[h.handler_name for h in stopped]}")
# 检查失败情况
failed = results.get_failed_handlers()
if failed:
print("失败的处理器:")
for handler in failed:
print(f" - {handler.handler_name}: {handler.message}")
```
## 保持兼容性
事件处理器继续使用原有的Tuple返回格式无需任何修改
```python
# 保持原有格式
async def execute(self, message) -> Tuple[bool, bool, Optional[str]]:
return True, True, "处理成功"
```
## 注意事项
- 事件处理器**无需修改**继续使用Tuple[bool, bool, str]格式
- 系统会自动将Tuple转换为HandlerResult实例
- HandlerResultsCollection提供了丰富的查询和统计功能
- 异常处理已内置到事件激活流程中
- 完全向后兼容,现有代码无需任何改动

View File

@ -37,7 +37,8 @@ from .utils import (
from .apis import ( from .apis import (
chat_api, chat_api,
tool_api, tool_api,
event_api,
component_manage_api, component_manage_api,
config_api, config_api,
database_api, database_api,

View File

@ -18,6 +18,7 @@ from src.plugin_system.apis import (
plugin_manage_api, plugin_manage_api,
send_api, send_api,
tool_api, tool_api,
event_api
) )
from .logging_api import get_logger from .logging_api import get_logger
from .plugin_register_api import register_plugin from .plugin_register_api import register_plugin
@ -38,4 +39,5 @@ __all__ = [
"get_logger", "get_logger",
"register_plugin", "register_plugin",
"tool_api", "tool_api",
"event_api",
] ]

View File

@ -1,10 +1,10 @@
from typing import Dict from typing import Dict, Type
from src.plugin_system.core.component_registry import component_registry
from src.plugin_system.base.component_types import ( from src.plugin_system.base.component_types import (
ComponentType, ComponentType,
EventInfo, EventInfo,
) )
from src.plugin_system.base.base_event import BaseEvent from src.plugin_system.base.base_event import BaseEvent
from src.plugin_system.base.base_events_handler import BaseEventHandler
# === 事件管理方法 === # === 事件管理方法 ===
def register_event(event_name: str) -> None: def register_event(event_name: str) -> None:
@ -14,6 +14,7 @@ def register_event(event_name: str) -> None:
Args: Args:
event_name (str): 事件名称 event_name (str): 事件名称
""" """
from src.plugin_system.core.component_registry import component_registry
event_info = EventInfo(name=event_name, component_type=ComponentType.EVENT) event_info = EventInfo(name=event_name, component_type=ComponentType.EVENT)
event_class = BaseEvent(event_name) event_class = BaseEvent(event_name)
try: try:
@ -32,8 +33,37 @@ def get_event(event_name: str) -> BaseEvent | None:
Returns: Returns:
BaseEvent: 事件实例如果事件不存在则返回 None BaseEvent: 事件实例如果事件不存在则返回 None
""" """
from src.plugin_system.core.component_registry import component_registry
return component_registry.get_component_class(event_name, ComponentType.EVENT) return component_registry.get_component_class(event_name, ComponentType.EVENT)
def get_event_subcribers(event_name: str) -> Dict[str, "BaseEventHandler"]:
"""
获取订阅指定事件的所有事件处理器
Args:
event_name (str): 事件名称
Returns:
dict: 包含所有订阅该事件的事件处理器的字典键为处理器名称值为 BaseEventHandler 对象
"""
event = get_event(event_name)
if event is None:
return {}
return {handler.handler_name: handler for handler in event.subcribers}
def get_handler(handler_name: str) -> Type["BaseEventHandler"] | None:
"""
获取指定名称的事件处理器实例
Args:
handler_name (str): 事件处理器名称
Returns:
BaseEventHandler: 事件处理器实例如果处理器不存在则返回 None
"""
from src.plugin_system.core.component_registry import component_registry
return component_registry.get_component_class(handler_name, ComponentType.EVENT_HANDLER)
def get_current_enabled_events() -> Dict[str, BaseEvent]: def get_current_enabled_events() -> Dict[str, BaseEvent]:
""" """
获取当前所有已启用的事件 获取当前所有已启用的事件
@ -41,6 +71,7 @@ def get_current_enabled_events() -> Dict[str, BaseEvent]:
Returns: Returns:
dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象 dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象
""" """
from src.plugin_system.core.component_registry import component_registry
return {name: event for name, event in component_registry._event_registry.items() if event.enabled} return {name: event for name, event in component_registry._event_registry.items() if event.enabled}
def get_current_unenabled_events() -> Dict[str, BaseEvent]: def get_current_unenabled_events() -> Dict[str, BaseEvent]:
@ -50,6 +81,7 @@ def get_current_unenabled_events() -> Dict[str, BaseEvent]:
Returns: Returns:
dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象 dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象
""" """
from src.plugin_system.core.component_registry import component_registry
return {name: event for name, event in component_registry._event_registry.items() if not event.enabled} return {name: event for name, event in component_registry._event_registry.items() if not event.enabled}
def get_all_registered_events() -> Dict[str, BaseEvent]: def get_all_registered_events() -> Dict[str, BaseEvent]:
@ -59,6 +91,7 @@ def get_all_registered_events() -> Dict[str, BaseEvent]:
Returns: Returns:
dict: 包含所有已注册事件的字典键为事件名称值为 BaseEvent 对象 dict: 包含所有已注册事件的字典键为事件名称值为 BaseEvent 对象
""" """
from src.plugin_system.core.component_registry import component_registry
return component_registry._event_registry return component_registry._event_registry
def init_default_events() -> None: def init_default_events() -> None:

View File

@ -1,16 +1,108 @@
from typing import List from typing import List, Dict, Any, Optional
from src.plugin_system.base.base_events_handler import BaseEventHandler import asyncio
class HandlerResult:
"""事件处理器执行结果
所有事件处理器必须返回此类的实例
"""
def __init__(self, success: bool, continue_process: bool, message: str = "", handler_name: str = ""):
self.success = success
self.continue_process = continue_process
self.message = message
self.handler_name = handler_name
def __repr__(self):
return f"HandlerResult(success={self.success}, continue_process={self.continue_process}, message='{self.message}', handler_name='{self.handler_name}')"
class HandlerResultsCollection:
"""HandlerResult集合提供便捷的查询方法"""
def __init__(self, results: List[HandlerResult]):
self.results = results
def all_continue_process(self) -> bool:
"""检查是否所有handler的continue_process都为True"""
return all(result.continue_process for result in self.results)
def get_all_results(self) -> List[HandlerResult]:
"""获取所有HandlerResult"""
return self.results
def get_failed_handlers(self) -> List[HandlerResult]:
"""获取执行失败的handler结果"""
return [result for result in self.results if not result.success]
def get_stopped_handlers(self) -> List[HandlerResult]:
"""获取continue_process为False的handler结果"""
return [result for result in self.results if not result.continue_process]
def get_handler_result(self, handler_name: str) -> Optional[HandlerResult]:
"""获取指定handler的结果"""
for result in self.results:
if result.handler_name == handler_name:
return result
return None
def get_success_count(self) -> int:
"""获取成功执行的handler数量"""
return sum(1 for result in self.results if result.success)
def get_failure_count(self) -> int:
"""获取执行失败的handler数量"""
return sum(1 for result in self.results if not result.success)
def get_summary(self) -> Dict[str, Any]:
"""获取执行摘要"""
return {
"total_handlers": len(self.results),
"success_count": self.get_success_count(),
"failure_count": self.get_failure_count(),
"continue_process": self.all_continue_process(),
"failed_handlers": [r.handler_name for r in self.get_failed_handlers()],
"stopped_handlers": [r.handler_name for r in self.get_stopped_handlers()]
}
class BaseEvent: class BaseEvent:
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
self.enabled = True self.enabled = True
from src.plugin_system.base.base_events_handler import BaseEventHandler
self.subcribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表 self.subcribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
def __name__(self): def __name__(self):
return self.name return self.name
async def activate(self, params: dict = {}) -> None: async def activate(self, params: dict = {}) -> HandlerResultsCollection:
"""激活事件,执行所有订阅的处理器
Args:
params: 传递给处理器的参数
Returns:
HandlerResultsCollection: 所有处理器的执行结果集合
"""
if not self.enabled:
return HandlerResultsCollection([])
results = []
for subscriber in self.subcribers: for subscriber in self.subcribers:
return await subscriber.execute(params) try:
result = await subscriber.execute(params)
if not isinstance(result, HandlerResult):
# 兼容旧版本将元组转换为HandlerResult
success, continue_process, message = result
result = HandlerResult(success, continue_process, message or "", getattr(subscriber, 'handler_name', 'unkown_handler'))
elif not result.handler_name:
# 补充handler_name
result.handler_name = getattr(subscriber, 'handler_name', 'unkown_handler')
results.append(result)
except Exception as e:
# 处理执行异常
handler_name = getattr(subscriber, 'handler_name', str(subscriber))
logger = __import__('src.common.logger', fromlist=['get_logger']).get_logger("base_event")
logger.error(f"事件处理器 {handler_name} 执行失败: {e}")
results.append(HandlerResult(False, True, str(e), handler_name))
return HandlerResultsCollection(results)

View File

@ -7,6 +7,7 @@ from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages
from src.plugin_system.base.base_events_handler import BaseEventHandler from src.plugin_system.base.base_events_handler import BaseEventHandler
from src.plugin_system.base.base_event import HandlerResult, HandlerResultsCollection
from .global_announcement_manager import global_announcement_manager from .global_announcement_manager import global_announcement_manager
if TYPE_CHECKING: if TYPE_CHECKING:
@ -83,14 +84,14 @@ class EventsManager:
async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool: async def _dispatch_intercepting_handler(self, handler: BaseEventHandler, message: Optional[MaiMessages]) -> bool:
"""分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。""" """分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。"""
try: try:
success, continue_processing, result = await handler.execute(message) result = await handler.execute(message)
if not success: if not result.success:
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") logger.error(f"EventHandler {handler.handler_name} 执行失败: {result.message}")
else: else:
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result.message}")
return continue_processing return result.continue_process
except Exception as e: except Exception as e:
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True) logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True)
return True # 发生异常时默认不中断其他处理 return True # 发生异常时默认不中断其他处理
@ -257,15 +258,15 @@ class EventsManager:
additional_data={"response_is_processed": True}, additional_data={"response_is_processed": True},
) )
def _task_done_callback(self, task: asyncio.Task[Tuple[bool, bool, str | None]]): def _task_done_callback(self, task: asyncio.Task[HandlerResult]):
"""任务完成回调""" """任务完成回调"""
task_name = task.get_name() or "Unknown Task" task_name = task.get_name() or "Unknown Task"
try: try:
success, _, result = task.result() # 忽略是否继续的标志,因为消息本身未被拦截 result = task.result()
if success: if result.success:
logger.debug(f"事件处理任务 {task_name} 已成功完成: {result}") logger.debug(f"事件处理任务 {task_name} 已成功完成: {result.message}")
else: else:
logger.error(f"事件处理任务 {task_name} 执行失败: {result}") logger.error(f"事件处理任务 {task_name} 执行失败: {result.message}")
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception as e: except Exception as e: