初步大改Event系统,使event的订阅与激活更加灵活

pull/1215/head
Windpicker-owo 2025-08-23 03:24:35 +08:00
parent ee30e03def
commit 6d26950529
8 changed files with 195 additions and 21 deletions

View File

@ -16,6 +16,7 @@ from src.mood.mood_manager import mood_manager
from src.chat.knowledge import lpmm_start_up from src.chat.knowledge import lpmm_start_up
from rich.traceback import install from rich.traceback import install
from src.migrate_helper.migrate import check_and_run_migrations from src.migrate_helper.migrate import check_and_run_migrations
from src.plugin_system.apis.event_api import init_default_events, get_event
# from src.api.main import start_api_server # from src.api.main import start_api_server
# 导入新的插件管理器 # 导入新的插件管理器
@ -85,6 +86,9 @@ class MainSystem:
# start_api_server() # start_api_server()
# logger.info("API服务器启动成功") # logger.info("API服务器启动成功")
# 注册默认事件
init_default_events()
# 启动LPMM # 启动LPMM
lpmm_start_up() lpmm_start_up()
@ -125,11 +129,8 @@ class MainSystem:
# 触发 ON_START 事件 # 触发 ON_START 事件
from src.plugin_system.core.events_manager import events_manager on_start_event = get_event("on_start")
from src.plugin_system.base.component_types import EventType _ = await on_start_event.activate()
await events_manager.handle_mai_events(
event_type=EventType.ON_START
)
# logger.info("已触发 ON_START 事件") # logger.info("已触发 ON_START 事件")
try: try:
init_time = int(1000 * (time.time() - init_start_time)) init_time = int(1000 * (time.time() - init_start_time))

View File

@ -9,6 +9,22 @@ from src.plugin_system.base.component_types import (
) )
def get_component_instance(
component_name: str, component_type: ComponentType
) -> Optional[object]:
"""
获取指定组件的实例
Args:
component_name (str): 组件名称
component_type (ComponentType): 组件类型
Returns:
object: 组件实例如果组件不存在则返回 None
"""
from src.plugin_system.core.component_registry import component_registry
return component_registry.get_component_class(component_name, component_type) # type: ignoreq
# === 插件信息查询 === # === 插件信息查询 ===
def get_all_plugin_info() -> Dict[str, PluginInfo]: def get_all_plugin_info() -> Dict[str, PluginInfo]:
""" """

View File

@ -0,0 +1,78 @@
from typing import Dict
from src.plugin_system.core.component_registry import component_registry
from src.plugin_system.base.component_types import (
ComponentType,
EventInfo,
)
from src.plugin_system.base.base_event import BaseEvent
# === 事件管理方法 ===
def register_event(event_name: str) -> None:
"""
注册一个新的事件
Args:
event_name (str): 事件名称
"""
event_info = EventInfo(name=event_name, component_type=ComponentType.EVENT)
event_class = BaseEvent(event_name)
try:
component_registry.register_component(event_info, event_class)
return True
except:
return False
def get_event(event_name: str) -> BaseEvent | None:
"""
获取指定事件的实例
Args:
event_name (str): 事件名称
Returns:
BaseEvent: 事件实例如果事件不存在则返回 None
"""
return component_registry.get_component_class(event_name, ComponentType.EVENT)
def get_current_enabled_events() -> Dict[str, BaseEvent]:
"""
获取当前所有已启用的事件
Returns:
dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象
"""
return {name: event for name, event in component_registry._event_registry.items() if event.enabled}
def get_current_unenabled_events() -> Dict[str, BaseEvent]:
"""
获取当前所有已禁用的事件
Returns:
dict: 包含所有已启用事件的字典键为事件名称值为 BaseEvent 对象
"""
return {name: event for name, event in component_registry._event_registry.items() if not event.enabled}
def get_all_registered_events() -> Dict[str, BaseEvent]:
"""
获取所有已注册的事件
Returns:
dict: 包含所有已注册事件的字典键为事件名称值为 BaseEvent 对象
"""
return component_registry._event_registry
def init_default_events() -> None:
"""
初始化默认事件
"""
default_events = [
"on_start",
"on_stop",
"on_message",
"post_llm",
"after_llm",
"post_send",
"after_send"
]
for event_name in default_events:
register_event(event_name)

View File

@ -0,0 +1,16 @@
from typing import List
from src.plugin_system.base.base_events_handler import BaseEventHandler
class BaseEvent:
def __init__(self, name: str):
self.name = name
self.enabled = True
self.subcribers: List["BaseEventHandler"] = [] # 订阅该事件的事件处理器列表
def __name__(self):
return self.name
async def activate(self, params: dict = {}) -> None:
for subscriber in self.subcribers:
return await subscriber.execute(params)

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Tuple, Optional, Dict from typing import Tuple, Optional, Dict, List
from src.common.logger import get_logger from src.common.logger import get_logger
from .component_types import MaiMessages, EventType, EventHandlerInfo, ComponentType from .component_types import MaiMessages, EventType, EventHandlerInfo, ComponentType
@ -13,8 +13,6 @@ class BaseEventHandler(ABC):
所有事件处理器都应该继承这个基类提供事件处理的基本接口 所有事件处理器都应该继承这个基类提供事件处理的基本接口
""" """
event_type: EventType = EventType.UNKNOWN
"""事件类型,默认为未知"""
handler_name: str = "" handler_name: str = ""
"""处理器名称""" """处理器名称"""
handler_description: str = "" handler_description: str = ""
@ -23,6 +21,8 @@ class BaseEventHandler(ABC):
"""处理器权重,越大权重越高""" """处理器权重,越大权重越高"""
intercept_message: bool = False intercept_message: bool = False
"""是否拦截消息,默认为否""" """是否拦截消息,默认为否"""
init_subcribe: List[str] = []
"""初始化时订阅的事件名称"""
def __init__(self): def __init__(self):
self.log_prefix = "[EventHandler]" self.log_prefix = "[EventHandler]"
@ -30,8 +30,12 @@ class BaseEventHandler(ABC):
"""对应插件名""" """对应插件名"""
self.plugin_config: Optional[Dict] = None self.plugin_config: Optional[Dict] = None
"""插件配置字典""" """插件配置字典"""
if self.event_type == EventType.UNKNOWN: self.subcribed_events = []
raise NotImplementedError("事件处理器必须指定 event_type") """订阅的事件列表"""
if self.init_subcribe:
for event_name in self.init_subcribe:
self.subcribe(event_name)
@abstractmethod @abstractmethod
async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]: async def execute(self, message: MaiMessages) -> Tuple[bool, bool, Optional[str]]:
@ -42,6 +46,43 @@ class BaseEventHandler(ABC):
""" """
raise NotImplementedError("子类必须实现 execute 方法") raise NotImplementedError("子类必须实现 execute 方法")
def subcribe(self, event_name: str) -> None:
"""订阅一个事件
Args:
event (BaseEvent): 要订阅的事件实例
"""
from src.plugin_system.apis.event_api import get_event
event = get_event(event_name)
if event is None:
logger.error(f"事件 '{event_name}' 不存在,无法订阅")
return
event.subcribers.append(self)
logger.debug(f"{self.log_prefix} 订阅事件 {event.name}")
self.subcribed_events.append(event.name)
def unsubscribe(self, event_name: str) -> None:
"""取消订阅一个事件
Args:
event (BaseEvent): 要取消订阅的事件实例
"""
from src.plugin_system.apis.event_api import get_event
event = get_event(event_name)
if event is None:
logger.error(f"事件 '{event_name}' 不存在,无法取消订阅")
return
if self in event.subcribers:
event.subcribers.remove(self)
logger.debug(f"{self.log_prefix} 取消订阅事件 {event.name}")
self.subcribed_events.remove(event.name)
else:
logger.warning(f"{self.log_prefix} 未订阅事件 {event.name},无法取消订阅")
@classmethod @classmethod
def get_handler_info(cls) -> "EventHandlerInfo": def get_handler_info(cls) -> "EventHandlerInfo":
"""获取事件处理器的信息""" """获取事件处理器的信息"""
@ -54,7 +95,6 @@ class BaseEventHandler(ABC):
name=name, name=name,
component_type=ComponentType.EVENT_HANDLER, component_type=ComponentType.EVENT_HANDLER,
description=getattr(cls, "handler_description", "events处理器"), description=getattr(cls, "handler_description", "events处理器"),
event_type=cls.event_type,
weight=cls.weight, weight=cls.weight,
intercept_message=cls.intercept_message, intercept_message=cls.intercept_message,
) )

View File

@ -12,9 +12,10 @@ class ComponentType(Enum):
ACTION = "action" # 动作组件 ACTION = "action" # 动作组件
COMMAND = "command" # 命令组件 COMMAND = "command" # 命令组件
TOOL = "tool" # 服务组件(预留) TOOL = "tool" # 工具组件
SCHEDULER = "scheduler" # 定时任务组件(预留) SCHEDULER = "scheduler" # 定时任务组件(预留)
EVENT_HANDLER = "event_handler" # 事件处理组件(预留) EVENT_HANDLER = "event_handler" # 事件处理组件
EVENT = "event" # 事件组件
def __str__(self) -> str: def __str__(self) -> str:
return self.value return self.value
@ -165,7 +166,6 @@ class ToolInfo(ComponentInfo):
class EventHandlerInfo(ComponentInfo): class EventHandlerInfo(ComponentInfo):
"""事件处理器组件信息""" """事件处理器组件信息"""
event_type: EventType = EventType.ON_MESSAGE # 监听事件类型
intercept_message: bool = False # 是否拦截消息处理(默认不拦截) intercept_message: bool = False # 是否拦截消息处理(默认不拦截)
weight: int = 0 # 事件处理器权重,决定执行顺序 weight: int = 0 # 事件处理器权重,决定执行顺序
@ -173,6 +173,13 @@ class EventHandlerInfo(ComponentInfo):
super().__post_init__() super().__post_init__()
self.component_type = ComponentType.EVENT_HANDLER self.component_type = ComponentType.EVENT_HANDLER
@dataclass
class EventInfo(ComponentInfo):
"""事件组件信息"""
def __post_init__(self):
super().__post_init__()
self.component_type = ComponentType.EVENT
@dataclass @dataclass
class PluginInfo: class PluginInfo:

View File

@ -10,12 +10,14 @@ from src.plugin_system.base.component_types import (
CommandInfo, CommandInfo,
EventHandlerInfo, EventHandlerInfo,
PluginInfo, PluginInfo,
EventInfo,
ComponentType, ComponentType,
) )
from src.plugin_system.base.base_command import BaseCommand from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.base_action import BaseAction from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_tool import BaseTool from src.plugin_system.base.base_tool import BaseTool
from src.plugin_system.base.base_events_handler import BaseEventHandler from src.plugin_system.base.base_events_handler import BaseEventHandler
from src.plugin_system.base.base_event import BaseEvent
logger = get_logger("component_registry") logger = get_logger("component_registry")
@ -61,6 +63,9 @@ class ComponentRegistry:
self._enabled_event_handlers: Dict[str, Type[BaseEventHandler]] = {} self._enabled_event_handlers: Dict[str, Type[BaseEventHandler]] = {}
"""启用的事件处理器 event_handler名 -> event_handler类""" """启用的事件处理器 event_handler名 -> event_handler类"""
self._event_registry: Dict[str, Type[BaseEvent]] = {}
"""事件注册表 事件名 -> event 类"""
logger.info("组件注册中心初始化完成") logger.info("组件注册中心初始化完成")
# == 注册方法 == # == 注册方法 ==
@ -141,6 +146,10 @@ class ComponentRegistry:
assert isinstance(component_info, EventHandlerInfo) assert isinstance(component_info, EventHandlerInfo)
assert issubclass(component_class, BaseEventHandler) assert issubclass(component_class, BaseEventHandler)
ret = self._register_event_handler_component(component_info, component_class) ret = self._register_event_handler_component(component_info, component_class)
case ComponentType.EVENT:
assert isinstance(component_info, EventInfo)
assert isinstance(component_class, BaseEvent)
ret = self._register_event(component_info, component_class)
case _: case _:
logger.warning(f"未知组件类型: {component_type}") logger.warning(f"未知组件类型: {component_type}")
@ -229,6 +238,18 @@ class ComponentRegistry:
logger.error(f"注册事件处理器 {handler_name} 失败") logger.error(f"注册事件处理器 {handler_name} 失败")
return False return False
def _register_event(self, event_info: EventInfo, event_class: BaseEvent) -> bool:
"""注册事件到事件注册表"""
if not (event_name := event_info.name):
logger.error(f"Event组件 {event_class.__name__} 必须指定名称")
return False
if event_name in self._event_registry:
logger.warning(f"事件 {event_name} 已存在,跳过注册")
return False
self._event_registry[event_name] = event_class
logger.debug(f"已注册事件: {event_name}")
return True
# === 组件移除相关 === # === 组件移除相关 ===
async def remove_component(self, component_name: str, component_type: ComponentType, plugin_name: str) -> bool: async def remove_component(self, component_name: str, component_type: ComponentType, plugin_name: str) -> bool:

View File

@ -157,14 +157,9 @@ class EventsManager:
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool:
"""插入事件处理器到对应的事件类型列表中并设置其插件配置""" """插入事件处理器到对应的事件类型列表中并设置其插件配置"""
if handler_class.event_type == EventType.UNKNOWN:
logger.error(f"事件处理器 {handler_class.__name__} 的事件类型未知,无法注册")
return False
handler_instance = handler_class() handler_instance = handler_class()
handler_instance.set_plugin_name(handler_info.plugin_name or "unknown") handler_instance.set_plugin_name(handler_info.plugin_name or "unknown")
self._events_subscribers[handler_class.event_type].append(handler_instance)
self._events_subscribers[handler_class.event_type].sort(key=lambda x: x.weight, reverse=True)
return True return True