From 536f838616caa395a5fda76be519a73c1f2f79c7 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 21 Aug 2025 14:29:47 +0800 Subject: [PATCH] feat: update tool history feature to version 0.10.0 --- bot.py | 6 + src/main.py | 8 ++ src/plugin_system/base/component_types.py | 1 + src/plugin_system/core/events_manager.py | 148 ++++++++++++++++------ 4 files changed, 124 insertions(+), 39 deletions(-) diff --git a/bot.py b/bot.py index 5342be7c..f6056a4e 100644 --- a/bot.py +++ b/bot.py @@ -237,6 +237,12 @@ if __name__ == "__main__": logger.error(f"主程序发生异常: {str(e)} {str(traceback.format_exc())}") exit_code = 1 # 标记发生错误 finally: + # 触发 ON_STOP 事件 + from src.plugin_system.core.events_manager import events_manager + from src.plugin_system.base.component_types import EventType + asyncio.run(events_manager.handle_mai_events(event_type=EventType.ON_STOP)) + # logger.info("已触发 ON_STOP 事件") + # 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭) if "loop" in locals() and loop and not loop.is_closed(): loop.close() diff --git a/src/main.py b/src/main.py index f7d1bc76..cad85a0e 100644 --- a/src/main.py +++ b/src/main.py @@ -86,6 +86,14 @@ class MainSystem: # 加载所有actions,包括默认的和插件的 plugin_manager.load_all_plugins() + + # 触发 ON_START 事件 + from src.plugin_system.core.events_manager import events_manager + from src.plugin_system.base.component_types import EventType + await events_manager.handle_mai_events( + event_type=EventType.ON_START + ) + # logger.info("已触发 ON_START 事件") # 初始化表情管理器 get_emoji_manager().initialize() diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 09969799..5570c2ad 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -54,6 +54,7 @@ class EventType(Enum): """ ON_START = "on_start" # 启动事件,用于调用按时任务 + ON_STOP = "on_stop" # 停止事件,用于调用按时任务 ON_MESSAGE = "on_message" ON_PLAN = "on_plan" POST_LLM = "post_llm" diff --git a/src/plugin_system/core/events_manager.py b/src/plugin_system/core/events_manager.py index f50659da..cfeb6b55 100644 --- a/src/plugin_system/core/events_manager.py +++ b/src/plugin_system/core/events_manager.py @@ -1,6 +1,6 @@ import asyncio import contextlib -from typing import List, Dict, Optional, Type, Tuple, Any +from typing import List, Dict, Optional, Type, Tuple, Any, Coroutine from src.chat.message_receive.message import MessageRecv from src.chat.message_receive.chat_stream import get_chat_manager @@ -42,6 +42,82 @@ class EventsManager: self._handler_mapping[handler_name] = handler_class return self._insert_event_handler(handler_class, handler_info) + def _task_done_callback(self, task: asyncio.Task): + """统一处理异步任务完成后的回调,例如记录异常。""" + try: + task.result() + except asyncio.CancelledError: + logger.warning(f"任务 {task.get_name()} 被取消。") + except Exception as e: + logger.error(f"异步事件处理器任务 {task.get_name()} 发生未捕获异常: {e}", exc_info=True) + + def _prepare_message( + self, + event_type: EventType, + message: Optional[MessageRecv] = None, + llm_prompt: Optional[str] = None, + llm_response: Optional[Dict[str, Any]] = None, + stream_id: Optional[str] = None, + action_usage: Optional[List[str]] = None, + ) -> Optional[MaiMessages]: + """根据事件类型和输入,准备和转换消息对象。""" + if message: + return self._transform_event_message(message, llm_prompt, llm_response) + + if event_type not in [EventType.ON_START, EventType.ON_STOP]: + assert stream_id, "如果没有消息,必须为非启动/关闭事件提供流ID" + if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]: + return self._build_message_from_stream(stream_id, llm_prompt, llm_response) + else: + return self._transform_event_without_message(stream_id, llm_prompt, llm_response, action_usage) + + return None # ON_START, ON_STOP事件没有消息体 + + def _execute_handler( + self, + handler: BaseEventHandler, + message: Optional[MaiMessages] + ) -> Coroutine[Any, Any, tuple[bool, bool, Any]]: + """封装了调用 handler.execute 的逻辑。""" + if message: + return handler.execute(message) + return handler.execute() # 适用于 ON_START, ON_STOP + + def _dispatch_handler_task(self, handler: BaseEventHandler, message: Optional[MaiMessages]): + """分发一个非阻塞(异步)的事件处理任务。""" + try: + # 无论是否有 message,都统一调用 + coro = self._execute_handler(handler, message) + task = asyncio.create_task(coro) + + task_name = f"{handler.plugin_name}-{handler.handler_name}" + task.set_name(task_name) + task.add_done_callback(self._task_done_callback) + + self._handler_tasks.setdefault(handler.handler_name, []).append(task) + except Exception as e: + logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True) + + async def _dispatch_intercepting_handler( + self, + handler: BaseEventHandler, + message: Optional[MaiMessages] + ) -> bool: + """分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。""" + try: + # 统一调用 + success, continue_processing, result = await self._execute_handler(handler, message) + + if not success: + logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") + else: + logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") + + return continue_processing + except Exception as e: + logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True) + return True # 发生异常时默认不中断其他处理 + async def handle_mai_events( self, event_type: EventType, @@ -51,49 +127,43 @@ class EventsManager: stream_id: Optional[str] = None, action_usage: Optional[List[str]] = None, ) -> bool: - """处理 events""" + """ + 处理所有事件,根据事件类型分发给订阅的处理器。 + """ from src.plugin_system.core import component_registry continue_flag = True - transformed_message: Optional[MaiMessages] = None - if not message: - assert stream_id, "如果没有消息,必须提供流ID" - if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]: - transformed_message = self._build_message_from_stream(stream_id, llm_prompt, llm_response) - else: - transformed_message = self._transform_event_without_message( - stream_id, llm_prompt, llm_response, action_usage - ) - else: - transformed_message = self._transform_event_message(message, llm_prompt, llm_response) - for handler in self._events_subscribers.get(event_type, []): - if transformed_message.stream_id: - stream_id = transformed_message.stream_id - if handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(stream_id): - continue - handler.set_plugin_config(component_registry.get_plugin_config(handler.plugin_name) or {}) + + # 1. 准备消息 + transformed_message = self._prepare_message( + event_type, message, llm_prompt, llm_response, stream_id, action_usage + ) + + # 2. 获取并遍历处理器 + handlers = self._events_subscribers.get(event_type, []) + if not handlers: + return True + + current_stream_id = transformed_message.stream_id if transformed_message else None + + for handler in handlers: + # 3. 前置检查和配置加载 + if current_stream_id and handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(current_stream_id): + continue + + # 统一加载插件配置 + plugin_config = component_registry.get_plugin_config(handler.plugin_name) or {} + handler.set_plugin_config(plugin_config) + + # 4. 根据类型分发任务 if handler.intercept_message: - try: - success, continue_processing, result = await handler.execute(transformed_message) - if not success: - logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}") - else: - logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}") - continue_flag = continue_flag and continue_processing - except Exception as e: - logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}") - continue + # 阻塞执行,并更新 continue_flag + should_continue = await self._dispatch_intercepting_handler(handler, transformed_message) + continue_flag = continue_flag and should_continue else: - try: - handler_task = asyncio.create_task(handler.execute(transformed_message)) - handler_task.add_done_callback(self._task_done_callback) - handler_task.set_name(f"{handler.plugin_name}-{handler.handler_name}") - if handler.handler_name not in self._handler_tasks: - self._handler_tasks[handler.handler_name] = [] - self._handler_tasks[handler.handler_name].append(handler_task) - except Exception as e: - logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}") - continue + # 异步执行,不阻塞 + self._dispatch_handler_task(handler, transformed_message) + return continue_flag def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool: