mirror of https://github.com/Mai-with-u/MaiBot.git
Merge pull request #1207 from Windpicker-owo/dev
【Fix】修复ON_START事件无法正常触发的问题,添加ON_STOP事件,重构handle_mai_events方法pull/1215/head
commit
ee30e03def
6
bot.py
6
bot.py
|
|
@ -80,7 +80,11 @@ def easter_egg():
|
||||||
async def graceful_shutdown():
|
async def graceful_shutdown():
|
||||||
try:
|
try:
|
||||||
logger.info("正在优雅关闭麦麦...")
|
logger.info("正在优雅关闭麦麦...")
|
||||||
|
# 触发 ON_STOP 事件
|
||||||
|
from src.plugin_system.core.events_manager import events_manager
|
||||||
|
from src.plugin_system.base.component_types import EventType
|
||||||
|
asyncio.run(events_manager.handle_mai_events(event_type=EventType.ON_STOP))
|
||||||
|
# logger.info("已触发 ON_STOP 事件")
|
||||||
# 停止所有异步任务
|
# 停止所有异步任务
|
||||||
await async_task_manager.stop_and_wait_all_tasks()
|
await async_task_manager.stop_and_wait_all_tasks()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ class MainSystem:
|
||||||
lpmm_start_up()
|
lpmm_start_up()
|
||||||
|
|
||||||
# 加载所有actions,包括默认的和插件的
|
# 加载所有actions,包括默认的和插件的
|
||||||
plugin_manager.load_all_plugins()
|
plugin_manager.load_all_plugins()
|
||||||
|
|
||||||
# 初始化表情管理器
|
# 初始化表情管理器
|
||||||
get_emoji_manager().initialize()
|
get_emoji_manager().initialize()
|
||||||
|
|
@ -124,6 +124,13 @@ class MainSystem:
|
||||||
await check_and_run_migrations()
|
await check_and_run_migrations()
|
||||||
|
|
||||||
|
|
||||||
|
# 触发 ON_START 事件
|
||||||
|
from src.plugin_system.core.events_manager import events_manager
|
||||||
|
from src.plugin_system.base.component_types import EventType
|
||||||
|
await events_manager.handle_mai_events(
|
||||||
|
event_type=EventType.ON_START
|
||||||
|
)
|
||||||
|
# logger.info("已触发 ON_START 事件")
|
||||||
try:
|
try:
|
||||||
init_time = int(1000 * (time.time() - init_start_time))
|
init_time = int(1000 * (time.time() - init_start_time))
|
||||||
logger.info(f"初始化完成,神经元放电{init_time}次")
|
logger.info(f"初始化完成,神经元放电{init_time}次")
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,7 @@ class EventType(Enum):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
ON_START = "on_start" # 启动事件,用于调用按时任务
|
ON_START = "on_start" # 启动事件,用于调用按时任务
|
||||||
|
ON_STOP = "on_stop" # 停止事件,用于调用按时任务
|
||||||
ON_MESSAGE = "on_message"
|
ON_MESSAGE = "on_message"
|
||||||
ON_PLAN = "on_plan"
|
ON_PLAN = "on_plan"
|
||||||
POST_LLM = "post_llm"
|
POST_LLM = "post_llm"
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
from typing import List, Dict, Optional, Type, Tuple, Any
|
from typing import List, Dict, Optional, Type, Tuple, Any, Coroutine
|
||||||
|
|
||||||
from src.chat.message_receive.message import MessageRecv
|
from src.chat.message_receive.message import MessageRecv
|
||||||
from src.chat.message_receive.chat_stream import get_chat_manager
|
from src.chat.message_receive.chat_stream import get_chat_manager
|
||||||
|
|
@ -42,6 +42,71 @@ class EventsManager:
|
||||||
self._handler_mapping[handler_name] = handler_class
|
self._handler_mapping[handler_name] = handler_class
|
||||||
return self._insert_event_handler(handler_class, handler_info)
|
return self._insert_event_handler(handler_class, handler_info)
|
||||||
|
|
||||||
|
def _prepare_message(
|
||||||
|
self,
|
||||||
|
event_type: EventType,
|
||||||
|
message: Optional[MessageRecv] = None,
|
||||||
|
llm_prompt: Optional[str] = None,
|
||||||
|
llm_response: Optional[Dict[str, Any]] = None,
|
||||||
|
stream_id: Optional[str] = None,
|
||||||
|
action_usage: Optional[List[str]] = None,
|
||||||
|
) -> Optional[MaiMessages]:
|
||||||
|
"""根据事件类型和输入,准备和转换消息对象。"""
|
||||||
|
if message:
|
||||||
|
return self._transform_event_message(message, llm_prompt, llm_response)
|
||||||
|
|
||||||
|
if event_type not in [EventType.ON_START, EventType.ON_STOP]:
|
||||||
|
assert stream_id, "如果没有消息,必须为非启动/关闭事件提供流ID"
|
||||||
|
if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]:
|
||||||
|
return self._build_message_from_stream(stream_id, llm_prompt, llm_response)
|
||||||
|
else:
|
||||||
|
return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage)
|
||||||
|
|
||||||
|
return None # ON_START, ON_STOP事件没有消息体
|
||||||
|
|
||||||
|
def _execute_handler(
|
||||||
|
self,
|
||||||
|
handler: BaseEventHandler,
|
||||||
|
message: Optional[MaiMessages]
|
||||||
|
) -> Coroutine[Any, Any, tuple[bool, bool, Any]]:
|
||||||
|
"""封装了调用 handler.execute 的逻辑。"""
|
||||||
|
return handler.execute(message) if message else handler.execute()
|
||||||
|
|
||||||
|
def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]):
|
||||||
|
"""分发一个非阻塞(异步)的事件处理任务。"""
|
||||||
|
try:
|
||||||
|
# 无论是否有 message,都统一调用
|
||||||
|
coro = self._execute_handler(handler, message)
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
|
||||||
|
task_name = f"{handler.plugin_name}-{handler.handler_name}"
|
||||||
|
task.set_name(task_name)
|
||||||
|
task.add_done_callback(self._task_done_callback)
|
||||||
|
|
||||||
|
self._handler_tasks.setdefault(handler.handler_name, []).append(task)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True)
|
||||||
|
|
||||||
|
async def _dispatch_intercepting_handler(
|
||||||
|
self,
|
||||||
|
handler: BaseEventHandler,
|
||||||
|
message: Optional[MaiMessages]
|
||||||
|
) -> bool:
|
||||||
|
"""分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。"""
|
||||||
|
try:
|
||||||
|
# 统一调用
|
||||||
|
success, continue_processing, result = await self._execute_handler(handler, message)
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}")
|
||||||
|
|
||||||
|
return continue_processing
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True)
|
||||||
|
return True # 发生异常时默认不中断其他处理
|
||||||
|
|
||||||
async def handle_mai_events(
|
async def handle_mai_events(
|
||||||
self,
|
self,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
|
|
@ -51,49 +116,43 @@ class EventsManager:
|
||||||
stream_id: Optional[str] = None,
|
stream_id: Optional[str] = None,
|
||||||
action_usage: Optional[List[str]] = None,
|
action_usage: Optional[List[str]] = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""处理 events"""
|
"""
|
||||||
|
处理所有事件,根据事件类型分发给订阅的处理器。
|
||||||
|
"""
|
||||||
from src.plugin_system.core import component_registry
|
from src.plugin_system.core import component_registry
|
||||||
|
|
||||||
continue_flag = True
|
continue_flag = True
|
||||||
transformed_message: Optional[MaiMessages] = None
|
|
||||||
if not message:
|
# 1. 准备消息
|
||||||
assert stream_id, "如果没有消息,必须提供流ID"
|
transformed_message = self._prepare_message(
|
||||||
if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]:
|
event_type, message, llm_prompt, llm_response, stream_id, action_usage
|
||||||
transformed_message = self._build_message_from_stream(stream_id, llm_prompt, llm_response)
|
)
|
||||||
else:
|
|
||||||
transformed_message = self._transform_event_without_message(
|
# 2. 获取并遍历处理器
|
||||||
stream_id, llm_prompt, llm_response, action_usage
|
handlers = self._events_subscribers.get(event_type, [])
|
||||||
)
|
if not handlers:
|
||||||
else:
|
return True
|
||||||
transformed_message = self._transform_event_message(message, llm_prompt, llm_response)
|
|
||||||
for handler in self._events_subscribers.get(event_type, []):
|
current_stream_id = transformed_message.stream_id if transformed_message else None
|
||||||
if transformed_message.stream_id:
|
|
||||||
stream_id = transformed_message.stream_id
|
for handler in handlers:
|
||||||
if handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(stream_id):
|
# 3. 前置检查和配置加载
|
||||||
continue
|
if current_stream_id and handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id):
|
||||||
handler.set_plugin_config(component_registry.get_plugin_config(handler.plugin_name) or {})
|
continue
|
||||||
|
|
||||||
|
# 统一加载插件配置
|
||||||
|
plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {}
|
||||||
|
handler.set_plugin_config(plugin_config)
|
||||||
|
|
||||||
|
# 4. 根据类型分发任务
|
||||||
if handler.intercept_message:
|
if handler.intercept_message:
|
||||||
try:
|
# 阻塞执行,并更新 continue_flag
|
||||||
success, continue_processing, result = await handler.execute(transformed_message)
|
should_continue = await self._dispatch_intercepting_handler(handler, transformed_message)
|
||||||
if not success:
|
continue_flag = continue_flag and should_continue
|
||||||
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}")
|
|
||||||
else:
|
|
||||||
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}")
|
|
||||||
continue_flag = continue_flag and continue_processing
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}")
|
|
||||||
continue
|
|
||||||
else:
|
else:
|
||||||
try:
|
# 异步执行,不阻塞
|
||||||
handler_task = asyncio.create_task(handler.execute(transformed_message))
|
self._dispatch_handler_task(handler, transformed_message)
|
||||||
handler_task.add_done_callback(self._task_done_callback)
|
|
||||||
handler_task.set_name(f"{handler.plugin_name}-{handler.handler_name}")
|
|
||||||
if handler.handler_name not in self._handler_tasks:
|
|
||||||
self._handler_tasks[handler.handler_name] = []
|
|
||||||
self._handler_tasks[handler.handler_name].append(handler_task)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}")
|
|
||||||
continue
|
|
||||||
return continue_flag
|
return continue_flag
|
||||||
|
|
||||||
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool:
|
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue