订阅缓存,避免因插件加载顺序造成订阅失败

pull/1215/head
Windpicker-owo 2025-08-23 22:44:16 +08:00
parent 9e4fb0e919
commit 2422287f72
4 changed files with 70 additions and 156 deletions

View File

@ -1,92 +0,0 @@
"""
简单的事件处理器权重排序测试
"""
import asyncio
from typing import Dict, Any, List
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 简化的测试类,避免复杂依赖
class SimpleHandler:
"""简化的事件处理器"""
def __init__(self, name: str, weight: int):
self.handler_name = name
self.weight = weight
async def execute(self, params: Dict[str, Any]):
print(f"执行处理器: {self.handler_name} (权重: {self.weight})")
return True, True, f"{self.handler_name} 执行完成"
class SimpleEvent:
"""简化的事件类"""
def __init__(self, name: str):
self.name = name
self.enabled = True
self.subcribers: List[SimpleHandler] = []
def add_subscriber(self, handler: SimpleHandler):
"""添加订阅者并排序"""
if handler not in self.subcribers:
self.subcribers.append(handler)
# 按权重从高到低排序
self.subcribers.sort(key=lambda h: h.weight, reverse=True)
print(f"添加处理器 {handler.handler_name} (权重: {handler.weight})")
async def activate(self, params: Dict[str, Any] = None):
"""激活事件"""
if params is None:
params = {}
print(f"\n激活事件: {self.name}")
print("执行顺序:")
for i, subscriber in enumerate(self.subcribers, 1):
print(f" {i}. {subscriber.handler_name} (权重: {subscriber.weight})")
await subscriber.execute(params)
async def test_weight_sorting():
"""测试权重排序功能"""
print("=== 事件处理器权重排序测试 ===")
# 创建测试事件
event = SimpleEvent("test_event")
# 创建不同权重的处理器
handlers = [
SimpleHandler("低权重处理器", 10),
SimpleHandler("高权重处理器", 100),
SimpleHandler("中等权重处理器", 50),
SimpleHandler("最高权重处理器", 200),
SimpleHandler("最低权重处理器", 5),
]
# 随机顺序添加处理器
import random
random.shuffle(handlers)
print("\n按随机顺序添加处理器:")
for handler in handlers:
event.add_subscriber(handler)
print("\n当前订阅者列表:")
for i, handler in enumerate(event.subcribers, 1):
print(f" {i}. {handler.handler_name} (权重: {handler.weight})")
# 验证排序
weights = [h.weight for h in event.subcribers]
is_sorted = all(weights[i] >= weights[i+1] for i in range(len(weights)-1))
if is_sorted:
print("\n[PASS] 权重排序验证通过:处理器已按权重从高到低排序")
else:
print("\n[FAIL] 权重排序验证失败")
# 测试事件触发
await event.activate({"test": "data"})
if __name__ == "__main__":
asyncio.run(test_weight_sorting())

View File

@ -90,7 +90,10 @@ class MainSystem:
lpmm_start_up() lpmm_start_up()
# 加载所有actions包括默认的和插件的 # 加载所有actions包括默认的和插件的
plugin_manager.load_all_plugins() plugin_manager.load_all_plugins()
# 处理所有缓存的事件订阅(插件加载完成后)
event_manager.process_all_pending_subscriptions()
# 初始化表情管理器 # 初始化表情管理器
get_emoji_manager().initialize() get_emoji_manager().initialize()

View File

@ -37,6 +37,7 @@ class EventManager:
self._events: Dict[str, BaseEvent] = {} self._events: Dict[str, BaseEvent] = {}
self._event_handlers: Dict[str, Type[BaseEventHandler]] = {} self._event_handlers: Dict[str, Type[BaseEventHandler]] = {}
self._pending_subscriptions: Dict[str, List[str]] = {} # 缓存失败的订阅
self._initialized = True self._initialized = True
logger.info("EventManager 单例初始化完成") logger.info("EventManager 单例初始化完成")
@ -56,6 +57,10 @@ class EventManager:
event = BaseEvent(event_name) event = BaseEvent(event_name)
self._events[event_name] = event self._events[event_name] = event
logger.info(f"事件 {event_name} 注册成功") logger.info(f"事件 {event_name} 注册成功")
# 检查是否有缓存的订阅需要处理
self._process_pending_subscriptions(event_name)
return True return True
def get_event(self, event_name: str) -> Optional[BaseEvent]: def get_event(self, event_name: str) -> Optional[BaseEvent]:
@ -145,9 +150,18 @@ class EventManager:
return False return False
self._event_handlers[handler_name] = handler_class() self._event_handlers[handler_name] = handler_class()
# 处理init_subcribe缓存失败的订阅
if self._event_handlers[handler_name].init_subcribe: if self._event_handlers[handler_name].init_subcribe:
failed_subscriptions = []
for event_name in self._event_handlers[handler_name].init_subcribe: for event_name in self._event_handlers[handler_name].init_subcribe:
self._event_handlers[handler_name].subcribe(event_name) if not self.subscribe_handler_to_event(handler_name, event_name):
failed_subscriptions.append(event_name)
# 缓存失败的订阅
if failed_subscriptions:
self._pending_subscriptions[handler_name] = failed_subscriptions
logger.warning(f"事件处理器 {handler_name} 的部分订阅失败,已缓存: {failed_subscriptions}")
logger.info(f"事件处理器 {handler_name} 注册成功") logger.info(f"事件处理器 {handler_name} 注册成功")
return True return True
@ -306,9 +320,59 @@ class EventManager:
"disabled_events": len(disabled_events), "disabled_events": len(disabled_events),
"total_handlers": len(self._event_handlers), "total_handlers": len(self._event_handlers),
"event_names": list(self._events.keys()), "event_names": list(self._events.keys()),
"handler_names": list(self._event_handlers.keys()) "handler_names": list(self._event_handlers.keys()),
"pending_subscriptions": len(self._pending_subscriptions)
} }
def _process_pending_subscriptions(self, event_name: str) -> None:
"""处理指定事件的缓存订阅
Args:
event_name (str): 事件名称
"""
handlers_to_remove = []
for handler_name, pending_events in self._pending_subscriptions.items():
if event_name in pending_events:
if self.subscribe_handler_to_event(handler_name, event_name):
pending_events.remove(event_name)
logger.info(f"成功处理缓存订阅: {handler_name} -> {event_name}")
# 如果该处理器没有更多待处理订阅,标记为移除
if not pending_events:
handlers_to_remove.append(handler_name)
# 清理已完成的处理器缓存
for handler_name in handlers_to_remove:
del self._pending_subscriptions[handler_name]
def process_all_pending_subscriptions(self) -> int:
"""处理所有缓存的订阅
Returns:
int: 成功处理的订阅数量
"""
processed_count = 0
# 复制待处理订阅,避免在迭代时修改字典
pending_copy = dict(self._pending_subscriptions)
for handler_name, pending_events in pending_copy.items():
for event_name in pending_events[:]: # 使用切片避免修改列表
if self.subscribe_handler_to_event(handler_name, event_name):
pending_events.remove(event_name)
processed_count += 1
# 清理已完成的处理器缓存
handlers_to_remove = [name for name, events in self._pending_subscriptions.items() if not events]
for handler_name in handlers_to_remove:
del self._pending_subscriptions[handler_name]
if processed_count > 0:
logger.info(f"批量处理缓存订阅完成,共处理 {processed_count} 个订阅")
return processed_count
# 创建全局事件管理器实例 # 创建全局事件管理器实例
event_manager = EventManager() event_manager = EventManager()

View File

@ -1,61 +0,0 @@
"""
测试事件处理器权重排序功能
"""
import asyncio
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.plugin_system.core.event_manager import EventManager
from plugins.test_weight_sorting_plugin.plugin import TestWeightSortingPlugin
async def test_weight_sorting():
"""测试权重排序功能"""
print("开始测试事件处理器权重排序功能...")
# 获取事件管理器实例
event_manager = EventManager()
# 初始化测试事件
event_manager.register_event("test_weight_event")
# 创建并初始化测试插件
plugin = TestWeightSortingPlugin()
await plugin.initialize()
# 检查事件订阅者是否按权重排序
event = event_manager.get_event("test_weight_event")
if event:
print(f"\n事件 {event.name} 的订阅者列表:")
for i, subscriber in enumerate(event.subcribers):
weight = getattr(subscriber, 'weight', 0)
handler_name = getattr(subscriber, 'handler_name', 'unknown')
print(f" {i+1}. {handler_name} (权重: {weight})")
# 验证排序是否正确
weights = [getattr(h, 'weight', 0) for h in event.subcribers]
is_sorted = all(weights[i] >= weights[i+1] for i in range(len(weights)-1))
if is_sorted:
print("\n✅ 权重排序正确:订阅者已按权重从高到低排序")
else:
print("\n❌ 权重排序错误:订阅者未按权重排序")
print(f"当前权重顺序: {weights}")
# 测试事件触发
print("\n触发测试事件...")
results = await event_manager.trigger_event("test_weight_event", {"test": "data"})
if results:
print(f"\n事件触发完成,共执行了 {len(results.results)} 个处理器")
for result in results.results:
print(f" - {result.handler_name}: {'成功' if result.success else '失败'}")
else:
print("❌ 测试事件不存在")
if __name__ == "__main__":
asyncio.run(test_weight_sorting())