diff --git a/.gitignore b/.gitignore index 104a3012..373171a7 100644 --- a/.gitignore +++ b/.gitignore @@ -321,6 +321,7 @@ run_pet.bat /plugins/* !/plugins !/plugins/hello_world_plugin +!/plugins/emoji_manage_plugin !/plugins/take_picture_plugin config.toml diff --git a/plugins/emoji_manage_plugin/_manifest.json b/plugins/emoji_manage_plugin/_manifest.json new file mode 100644 index 00000000..ee8d8318 --- /dev/null +++ b/plugins/emoji_manage_plugin/_manifest.json @@ -0,0 +1,53 @@ +{ + "manifest_version": 1, + "name": "BetterEmoji", + "version": "1.0.0", + "description": "更好的表情包管理插件", + "author": { + "name": "SengokuCola", + "url": "https://github.com/SengokuCola" + }, + "license": "GPL-v3.0-or-later", + + "host_application": { + "min_version": "0.10.4" + }, + "homepage_url": "https://github.com/SengokuCola/BetterEmoji", + "repository_url": "https://github.com/SengokuCola/BetterEmoji", + "keywords": ["emoji", "manage", "plugin"], + "categories": ["Examples", "Tutorial"], + + "default_locale": "zh-CN", + "locales_path": "_locales", + + "plugin_info": { + "is_built_in": false, + "plugin_type": "emoji_manage", + "components": [ + { + "type": "action", + "name": "hello_greeting", + "description": "向用户发送问候消息" + }, + { + "type": "action", + "name": "bye_greeting", + "description": "向用户发送告别消息", + "activation_modes": ["keyword"], + "keywords": ["再见", "bye", "88", "拜拜"] + }, + { + "type": "command", + "name": "time", + "description": "查询当前时间", + "pattern": "/time" + } + ], + "features": [ + "问候和告别功能", + "时间查询命令", + "配置文件示例", + "新手教程代码" + ] + } +} \ No newline at end of file diff --git a/plugins/emoji_manage_plugin/plugin.py b/plugins/emoji_manage_plugin/plugin.py new file mode 100644 index 00000000..8e0bbb6b --- /dev/null +++ b/plugins/emoji_manage_plugin/plugin.py @@ -0,0 +1,182 @@ +import random +from typing import List, Tuple, Type, Any +from src.plugin_system import ( + BasePlugin, + register_plugin, + BaseAction, + BaseCommand, + BaseTool, + ComponentInfo, + ActionActivationType, + ConfigField, + BaseEventHandler, + EventType, + MaiMessages, + ToolParamType, + ReplyContentType, + emoji_api, +) +from src.config.config import global_config + + +class ListEmojiCommand(BaseCommand): + """列表表情包Command - 响应/emoji list命令""" + + command_name = "emoji_list" + command_description = "列表表情包" + + # === 命令设置(必须填写)=== + command_pattern = r"^/emoji list$" # 精确匹配 "/emoji list" 命令 + + async def execute(self) -> Tuple[bool, str, bool]: + """执行列表表情包""" + import datetime + + # 获取当前时间 + time_format: str = self.get_config("time.format", "%Y-%m-%d %H:%M:%S") # type: ignore + now = datetime.datetime.now() + time_str = now.strftime(time_format) + + # 发送时间信息 + message = f"⏰ 当前时间:{time_str}" + await self.send_text(message) + + return True, f"显示了当前时间: {time_str}", True + + +class PrintMessage(BaseEventHandler): + """打印消息事件处理器 - 处理打印消息事件""" + + event_type = EventType.ON_MESSAGE + handler_name = "print_message_handler" + handler_description = "打印接收到的消息" + + async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, str | None, None, None]: + """执行打印消息事件处理""" + # 打印接收到的消息 + if self.get_config("print_message.enabled", False): + print(f"接收到消息: {message.raw_message if message else '无效消息'}") + return True, True, "消息已打印", None, None + + +class ForwardMessages(BaseEventHandler): + """ + 把接收到的消息转发到指定聊天ID + + 此组件是HYBRID消息和FORWARD消息的使用示例。 + 每收到10条消息,就会以1%的概率使用HYBRID消息转发,否则使用FORWARD消息转发。 + """ + + event_type = EventType.ON_MESSAGE + handler_name = "forward_messages_handler" + handler_description = "把接收到的消息转发到指定聊天ID" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counter = 0 # 用于计数转发的消息数量 + self.messages: List[str] = [] + + async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, None, None, None]: + if not message: + return True, True, None, None, None + stream_id = message.stream_id or "" + + if message.plain_text: + self.messages.append(message.plain_text) + self.counter += 1 + if self.counter % 10 == 0: + if random.random() < 0.01: + success = await self.send_hybrid(stream_id, [(ReplyContentType.TEXT, msg) for msg in self.messages]) + else: + success = await self.send_forward( + stream_id, + [ + ( + str(global_config.bot.qq_account), + str(global_config.bot.nickname), + [(ReplyContentType.TEXT, msg)], + ) + for msg in self.messages + ], + ) + if not success: + raise ValueError("转发消息失败") + self.messages = [] + return True, True, None, None, None + + +class RandomEmojis(BaseCommand): + command_name = "random_emojis" + command_description = "发送多张随机表情包" + command_pattern = r"^/random_emojis$" + + async def execute(self): + emojis = await emoji_api.get_random(5) + if not emojis: + return False, "未找到表情包", False + emoji_base64_list = [] + for emoji in emojis: + emoji_base64_list.append(emoji[0]) + return await self.forward_images(emoji_base64_list) + + async def forward_images(self, images: List[str]): + """ + 把多张图片用合并转发的方式发给用户 + """ + success = await self.send_forward([("0", "神秘用户", [(ReplyContentType.IMAGE, img)]) for img in images]) + return (True, "已发送随机表情包", True) if success else (False, "发送随机表情包失败", False) + + +# ===== 插件注册 ===== + + +@register_plugin +class EmojiManagePlugin(BasePlugin): + """表情包管理插件 - 管理表情包""" + + # 插件基本信息 + plugin_name: str = "emoji_manage_plugin" # 内部标识符 + enable_plugin: bool = False + dependencies: List[str] = [] # 插件依赖列表 + python_dependencies: List[str] = [] # Python包依赖列表 + config_file_name: str = "config.toml" # 配置文件名 + + # 配置节描述 + config_section_descriptions = {"plugin": "插件基本信息", "emoji": "表情包功能配置"} + + # 配置Schema定义 + config_schema: dict = { + "plugin": { + "version": ConfigField(type=str, default="1.0.0", description="插件版本"), + "enabled": ConfigField(type=bool, default=False, description="是否启用插件"), + }, + } + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + return [ + (PrintMessage.get_handler_info(), PrintMessage), + (ForwardMessages.get_handler_info(), ForwardMessages), + (RandomEmojis.get_command_info(), RandomEmojis), + ] + + +# @register_plugin +# class HelloWorldEventPlugin(BaseEPlugin): +# """Hello World事件插件 - 处理问候和告别事件""" + +# plugin_name = "hello_world_event_plugin" +# enable_plugin = False +# dependencies = [] +# python_dependencies = [] +# config_file_name = "event_config.toml" + +# config_schema = { +# "plugin": { +# "name": ConfigField(type=str, default="hello_world_event_plugin", description="插件名称"), +# "version": ConfigField(type=str, default="1.0.0", description="插件版本"), +# "enabled": ConfigField(type=bool, default=True, description="是否启用插件"), +# }, +# } + +# def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: +# return [(PrintMessage.get_handler_info(), PrintMessage)] diff --git a/src/chat/utils/utils_image.py b/src/chat/utils/utils_image.py index 94565b78..deba60a4 100644 --- a/src/chat/utils/utils_image.py +++ b/src/chat/utils/utils_image.py @@ -623,3 +623,41 @@ def image_path_to_base64(image_path: str) -> str: return base64.b64encode(image_data).decode("utf-8") else: raise IOError(f"读取图片文件失败: {image_path}") + + +def base64_to_image(image_base64: str, output_path: str) -> bool: + """将base64编码的图片保存为文件 + + Args: + image_base64: 图片的base64编码 + output_path: 输出文件路径 + + Returns: + bool: 是否成功保存 + + Raises: + ValueError: 当base64编码无效时 + IOError: 当保存文件失败时 + """ + try: + # 确保base64字符串只包含ASCII字符 + if isinstance(image_base64, str): + image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii") + + # 解码base64 + image_bytes = base64.b64decode(image_base64) + + # 确保输出目录存在 + output_dir = os.path.dirname(output_path) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + + # 保存文件 + with open(output_path, "wb") as f: + f.write(image_bytes) + + return True + + except Exception as e: + logger.error(f"保存base64图片失败: {e}") + return False diff --git a/src/plugin_system/apis/emoji_api.py b/src/plugin_system/apis/emoji_api.py index f8faebfe..d127aa6d 100644 --- a/src/plugin_system/apis/emoji_api.py +++ b/src/plugin_system/apis/emoji_api.py @@ -9,11 +9,15 @@ """ import random +import base64 +import os +import uuid +import time -from typing import Optional, Tuple, List +from typing import Optional, Tuple, List, Dict, Any from src.common.logger import get_logger -from src.chat.emoji_system.emoji_manager import get_emoji_manager -from src.chat.utils.utils_image import image_path_to_base64 +from src.chat.emoji_system.emoji_manager import get_emoji_manager, EMOJI_DIR +from src.chat.utils.utils_image import image_path_to_base64, base64_to_image logger = get_logger("emoji_api") @@ -245,6 +249,42 @@ def get_emotions() -> List[str]: return [] +async def get_all() -> List[Tuple[str, str, str]]: + """获取所有表情包 + + Returns: + List[Tuple[str, str, str]]: 包含(base64编码, 表情包描述, 随机情感标签)的元组列表 + """ + try: + emoji_manager = get_emoji_manager() + all_emojis = emoji_manager.emoji_objects + + if not all_emojis: + logger.warning("[EmojiAPI] 没有可用的表情包") + return [] + + results = [] + for emoji_obj in all_emojis: + if emoji_obj.is_deleted: + continue + + emoji_base64 = image_path_to_base64(emoji_obj.full_path) + + if not emoji_base64: + logger.error(f"[EmojiAPI] 无法转换表情包为base64: {emoji_obj.full_path}") + continue + + matched_emotion = random.choice(emoji_obj.emotion) if emoji_obj.emotion else "随机表情" + results.append((emoji_base64, emoji_obj.description, matched_emotion)) + + logger.debug(f"[EmojiAPI] 成功获取 {len(results)} 个表情包") + return results + + except Exception as e: + logger.error(f"[EmojiAPI] 获取所有表情包失败: {e}") + return [] + + def get_descriptions() -> List[str]: """获取所有表情包描述 @@ -264,3 +304,167 @@ def get_descriptions() -> List[str]: except Exception as e: logger.error(f"[EmojiAPI] 获取表情包描述失败: {e}") return [] + + +# ============================================================================= +# 表情包注册API函数 +# ============================================================================= + + +async def register_emoji(image_base64: str, filename: Optional[str] = None) -> Dict[str, Any]: + """注册新的表情包 + + Args: + image_base64: 图片的base64编码 + filename: 可选的文件名,如果未提供则自动生成 + + Returns: + Dict[str, Any]: 注册结果,包含以下字段: + - success: bool, 是否成功注册 + - message: str, 结果消息 + - description: Optional[str], 表情包描述(成功时) + - emotions: Optional[List[str]], 情感标签列表(成功时) + - replaced: Optional[bool], 是否替换了旧表情包(成功时) + - hash: Optional[str], 表情包哈希值(成功时) + + Raises: + ValueError: 如果base64为空或无效 + TypeError: 如果参数类型不正确 + """ + if not image_base64: + raise ValueError("图片base64编码不能为空") + if not isinstance(image_base64, str): + raise TypeError("image_base64必须是字符串类型") + if filename is not None and not isinstance(filename, str): + raise TypeError("filename必须是字符串类型或None") + + try: + logger.info(f"[EmojiAPI] 开始注册表情包,文件名: {filename or '自动生成'}") + + # 1. 获取emoji管理器并检查容量 + emoji_manager = get_emoji_manager() + count_before = emoji_manager.emoji_num + max_count = emoji_manager.emoji_num_max + + # 2. 检查是否可以注册(未达到上限或启用替换) + can_register = count_before < max_count or ( + count_before >= max_count and emoji_manager.emoji_num_max_reach_deletion + ) + + if not can_register: + return { + "success": False, + "message": f"表情包数量已达上限({count_before}/{max_count})且未启用替换功能", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + } + + # 3. 确保emoji目录存在 + os.makedirs(EMOJI_DIR, exist_ok=True) + + # 4. 生成文件名 + if not filename: + # 基于时间戳和UUID生成唯一文件名 + timestamp = int(time.time()) + unique_id = str(uuid.uuid4())[:8] + filename = f"emoji_{timestamp}_{unique_id}" + + # 确保文件名有扩展名 + if not filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): + filename = f"{filename}.png" # 默认使用png格式 + + # 5. 保存base64图片到emoji目录 + temp_file_path = os.path.join(EMOJI_DIR, filename) + + try: + # 解码base64并保存图片 + if not base64_to_image(image_base64, temp_file_path): + logger.error(f"[EmojiAPI] 无法保存base64图片到文件: {temp_file_path}") + return { + "success": False, + "message": "无法保存图片文件", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + } + + logger.debug(f"[EmojiAPI] 图片已保存到临时文件: {temp_file_path}") + + except Exception as save_error: + logger.error(f"[EmojiAPI] 保存图片文件失败: {save_error}") + return { + "success": False, + "message": f"保存图片文件失败: {str(save_error)}", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + } + + # 6. 调用注册方法 + register_success = await emoji_manager.register_emoji_by_filename(filename) + + # 7. 清理临时文件(如果注册失败但文件还存在) + if not register_success and os.path.exists(temp_file_path): + try: + os.remove(temp_file_path) + logger.debug(f"[EmojiAPI] 已清理临时文件: {temp_file_path}") + except Exception as cleanup_error: + logger.warning(f"[EmojiAPI] 清理临时文件失败: {cleanup_error}") + + # 8. 构建返回结果 + if register_success: + count_after = emoji_manager.emoji_num + replaced = count_after <= count_before # 如果数量没增加,说明是替换 + + # 尝试获取新注册的表情包信息 + new_emoji_info = None + if count_after > count_before or replaced: + # 获取最新的表情包信息 + try: + # 通过文件名查找新注册的表情包(注意:文件名在注册后可能已经改变) + for emoji_obj in reversed(emoji_manager.emoji_objects): + if not emoji_obj.is_deleted and ( + emoji_obj.filename == filename or # 直接匹配 + (hasattr(emoji_obj, 'full_path') and filename in emoji_obj.full_path) # 路径包含匹配 + ): + new_emoji_info = emoji_obj + break + except Exception as find_error: + logger.warning(f"[EmojiAPI] 查找新注册表情包信息失败: {find_error}") + + description = new_emoji_info.description if new_emoji_info else None + emotions = new_emoji_info.emotion if new_emoji_info else None + emoji_hash = new_emoji_info.hash if new_emoji_info else None + + return { + "success": True, + "message": f"表情包注册成功 {'(替换旧表情包)' if replaced else '(新增表情包)'}", + "description": description, + "emotions": emotions, + "replaced": replaced, + "hash": emoji_hash + } + else: + return { + "success": False, + "message": "表情包注册失败,可能因为重复、格式不支持或审核未通过", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + } + + except Exception as e: + logger.error(f"[EmojiAPI] 注册表情包时发生异常: {e}") + return { + "success": False, + "message": f"注册过程中发生错误: {str(e)}", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + }