feat:添加内置emoji管理插件

pull/1264/head
SengokuCola 2025-09-24 16:13:43 +08:00
parent a1e9893ac1
commit dff75142ec
5 changed files with 481 additions and 3 deletions

1
.gitignore vendored
View File

@ -321,6 +321,7 @@ run_pet.bat
/plugins/*
!/plugins
!/plugins/hello_world_plugin
!/plugins/emoji_manage_plugin
!/plugins/take_picture_plugin
config.toml

View File

@ -0,0 +1,53 @@
{
"manifest_version": 1,
"name": "BetterEmoji",
"version": "1.0.0",
"description": "更好的表情包管理插件",
"author": {
"name": "SengokuCola",
"url": "https://github.com/SengokuCola"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.10.4"
},
"homepage_url": "https://github.com/SengokuCola/BetterEmoji",
"repository_url": "https://github.com/SengokuCola/BetterEmoji",
"keywords": ["emoji", "manage", "plugin"],
"categories": ["Examples", "Tutorial"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": false,
"plugin_type": "emoji_manage",
"components": [
{
"type": "action",
"name": "hello_greeting",
"description": "向用户发送问候消息"
},
{
"type": "action",
"name": "bye_greeting",
"description": "向用户发送告别消息",
"activation_modes": ["keyword"],
"keywords": ["再见", "bye", "88", "拜拜"]
},
{
"type": "command",
"name": "time",
"description": "查询当前时间",
"pattern": "/time"
}
],
"features": [
"问候和告别功能",
"时间查询命令",
"配置文件示例",
"新手教程代码"
]
}
}

View File

@ -0,0 +1,182 @@
import random
from typing import List, Tuple, Type, Any
from src.plugin_system import (
BasePlugin,
register_plugin,
BaseAction,
BaseCommand,
BaseTool,
ComponentInfo,
ActionActivationType,
ConfigField,
BaseEventHandler,
EventType,
MaiMessages,
ToolParamType,
ReplyContentType,
emoji_api,
)
from src.config.config import global_config
class ListEmojiCommand(BaseCommand):
"""列表表情包Command - 响应/emoji list命令"""
command_name = "emoji_list"
command_description = "列表表情包"
# === 命令设置(必须填写)===
command_pattern = r"^/emoji list$" # 精确匹配 "/emoji list" 命令
async def execute(self) -> Tuple[bool, str, bool]:
"""执行列表表情包"""
import datetime
# 获取当前时间
time_format: str = self.get_config("time.format", "%Y-%m-%d %H:%M:%S") # type: ignore
now = datetime.datetime.now()
time_str = now.strftime(time_format)
# 发送时间信息
message = f"⏰ 当前时间:{time_str}"
await self.send_text(message)
return True, f"显示了当前时间: {time_str}", True
class PrintMessage(BaseEventHandler):
"""打印消息事件处理器 - 处理打印消息事件"""
event_type = EventType.ON_MESSAGE
handler_name = "print_message_handler"
handler_description = "打印接收到的消息"
async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, str | None, None, None]:
"""执行打印消息事件处理"""
# 打印接收到的消息
if self.get_config("print_message.enabled", False):
print(f"接收到消息: {message.raw_message if message else '无效消息'}")
return True, True, "消息已打印", None, None
class ForwardMessages(BaseEventHandler):
"""
把接收到的消息转发到指定聊天ID
此组件是HYBRID消息和FORWARD消息的使用示例
每收到10条消息就会以1%的概率使用HYBRID消息转发否则使用FORWARD消息转发
"""
event_type = EventType.ON_MESSAGE
handler_name = "forward_messages_handler"
handler_description = "把接收到的消息转发到指定聊天ID"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = 0 # 用于计数转发的消息数量
self.messages: List[str] = []
async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, None, None, None]:
if not message:
return True, True, None, None, None
stream_id = message.stream_id or ""
if message.plain_text:
self.messages.append(message.plain_text)
self.counter += 1
if self.counter % 10 == 0:
if random.random() < 0.01:
success = await self.send_hybrid(stream_id, [(ReplyContentType.TEXT, msg) for msg in self.messages])
else:
success = await self.send_forward(
stream_id,
[
(
str(global_config.bot.qq_account),
str(global_config.bot.nickname),
[(ReplyContentType.TEXT, msg)],
)
for msg in self.messages
],
)
if not success:
raise ValueError("转发消息失败")
self.messages = []
return True, True, None, None, None
class RandomEmojis(BaseCommand):
command_name = "random_emojis"
command_description = "发送多张随机表情包"
command_pattern = r"^/random_emojis$"
async def execute(self):
emojis = await emoji_api.get_random(5)
if not emojis:
return False, "未找到表情包", False
emoji_base64_list = []
for emoji in emojis:
emoji_base64_list.append(emoji[0])
return await self.forward_images(emoji_base64_list)
async def forward_images(self, images: List[str]):
"""
把多张图片用合并转发的方式发给用户
"""
success = await self.send_forward([("0", "神秘用户", [(ReplyContentType.IMAGE, img)]) for img in images])
return (True, "已发送随机表情包", True) if success else (False, "发送随机表情包失败", False)
# ===== 插件注册 =====
@register_plugin
class EmojiManagePlugin(BasePlugin):
"""表情包管理插件 - 管理表情包"""
# 插件基本信息
plugin_name: str = "emoji_manage_plugin" # 内部标识符
enable_plugin: bool = False
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = [] # Python包依赖列表
config_file_name: str = "config.toml" # 配置文件名
# 配置节描述
config_section_descriptions = {"plugin": "插件基本信息", "emoji": "表情包功能配置"}
# 配置Schema定义
config_schema: dict = {
"plugin": {
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
},
}
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
return [
(PrintMessage.get_handler_info(), PrintMessage),
(ForwardMessages.get_handler_info(), ForwardMessages),
(RandomEmojis.get_command_info(), RandomEmojis),
]
# @register_plugin
# class HelloWorldEventPlugin(BaseEPlugin):
# """Hello World事件插件 - 处理问候和告别事件"""
# plugin_name = "hello_world_event_plugin"
# enable_plugin = False
# dependencies = []
# python_dependencies = []
# config_file_name = "event_config.toml"
# config_schema = {
# "plugin": {
# "name": ConfigField(type=str, default="hello_world_event_plugin", description="插件名称"),
# "version": ConfigField(type=str, default="1.0.0", description="插件版本"),
# "enabled": ConfigField(type=bool, default=True, description="是否启用插件"),
# },
# }
# def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
# return [(PrintMessage.get_handler_info(), PrintMessage)]

View File

@ -623,3 +623,41 @@ def image_path_to_base64(image_path: str) -> str:
return base64.b64encode(image_data).decode("utf-8")
else:
raise IOError(f"读取图片文件失败: {image_path}")
def base64_to_image(image_base64: str, output_path: str) -> bool:
"""将base64编码的图片保存为文件
Args:
image_base64: 图片的base64编码
output_path: 输出文件路径
Returns:
bool: 是否成功保存
Raises:
ValueError: 当base64编码无效时
IOError: 当保存文件失败时
"""
try:
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
# 解码base64
image_bytes = base64.b64decode(image_base64)
# 确保输出目录存在
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# 保存文件
with open(output_path, "wb") as f:
f.write(image_bytes)
return True
except Exception as e:
logger.error(f"保存base64图片失败: {e}")
return False

View File

@ -9,11 +9,15 @@
"""
import random
import base64
import os
import uuid
import time
from typing import Optional, Tuple, List
from typing import Optional, Tuple, List, Dict, Any
from src.common.logger import get_logger
from src.chat.emoji_system.emoji_manager import get_emoji_manager
from src.chat.utils.utils_image import image_path_to_base64
from src.chat.emoji_system.emoji_manager import get_emoji_manager, EMOJI_DIR
from src.chat.utils.utils_image import image_path_to_base64, base64_to_image
logger = get_logger("emoji_api")
@ -245,6 +249,42 @@ def get_emotions() -> List[str]:
return []
async def get_all() -> List[Tuple[str, str, str]]:
"""获取所有表情包
Returns:
List[Tuple[str, str, str]]: 包含(base64编码, 表情包描述, 随机情感标签)的元组列表
"""
try:
emoji_manager = get_emoji_manager()
all_emojis = emoji_manager.emoji_objects
if not all_emojis:
logger.warning("[EmojiAPI] 没有可用的表情包")
return []
results = []
for emoji_obj in all_emojis:
if emoji_obj.is_deleted:
continue
emoji_base64 = image_path_to_base64(emoji_obj.full_path)
if not emoji_base64:
logger.error(f"[EmojiAPI] 无法转换表情包为base64: {emoji_obj.full_path}")
continue
matched_emotion = random.choice(emoji_obj.emotion) if emoji_obj.emotion else "随机表情"
results.append((emoji_base64, emoji_obj.description, matched_emotion))
logger.debug(f"[EmojiAPI] 成功获取 {len(results)} 个表情包")
return results
except Exception as e:
logger.error(f"[EmojiAPI] 获取所有表情包失败: {e}")
return []
def get_descriptions() -> List[str]:
"""获取所有表情包描述
@ -264,3 +304,167 @@ def get_descriptions() -> List[str]:
except Exception as e:
logger.error(f"[EmojiAPI] 获取表情包描述失败: {e}")
return []
# =============================================================================
# 表情包注册API函数
# =============================================================================
async def register_emoji(image_base64: str, filename: Optional[str] = None) -> Dict[str, Any]:
"""注册新的表情包
Args:
image_base64: 图片的base64编码
filename: 可选的文件名如果未提供则自动生成
Returns:
Dict[str, Any]: 注册结果包含以下字段
- success: bool, 是否成功注册
- message: str, 结果消息
- description: Optional[str], 表情包描述成功时
- emotions: Optional[List[str]], 情感标签列表成功时
- replaced: Optional[bool], 是否替换了旧表情包成功时
- hash: Optional[str], 表情包哈希值成功时
Raises:
ValueError: 如果base64为空或无效
TypeError: 如果参数类型不正确
"""
if not image_base64:
raise ValueError("图片base64编码不能为空")
if not isinstance(image_base64, str):
raise TypeError("image_base64必须是字符串类型")
if filename is not None and not isinstance(filename, str):
raise TypeError("filename必须是字符串类型或None")
try:
logger.info(f"[EmojiAPI] 开始注册表情包,文件名: {filename or '自动生成'}")
# 1. 获取emoji管理器并检查容量
emoji_manager = get_emoji_manager()
count_before = emoji_manager.emoji_num
max_count = emoji_manager.emoji_num_max
# 2. 检查是否可以注册(未达到上限或启用替换)
can_register = count_before < max_count or (
count_before >= max_count and emoji_manager.emoji_num_max_reach_deletion
)
if not can_register:
return {
"success": False,
"message": f"表情包数量已达上限({count_before}/{max_count})且未启用替换功能",
"description": None,
"emotions": None,
"replaced": None,
"hash": None
}
# 3. 确保emoji目录存在
os.makedirs(EMOJI_DIR, exist_ok=True)
# 4. 生成文件名
if not filename:
# 基于时间戳和UUID生成唯一文件名
timestamp = int(time.time())
unique_id = str(uuid.uuid4())[:8]
filename = f"emoji_{timestamp}_{unique_id}"
# 确保文件名有扩展名
if not filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
filename = f"{filename}.png" # 默认使用png格式
# 5. 保存base64图片到emoji目录
temp_file_path = os.path.join(EMOJI_DIR, filename)
try:
# 解码base64并保存图片
if not base64_to_image(image_base64, temp_file_path):
logger.error(f"[EmojiAPI] 无法保存base64图片到文件: {temp_file_path}")
return {
"success": False,
"message": "无法保存图片文件",
"description": None,
"emotions": None,
"replaced": None,
"hash": None
}
logger.debug(f"[EmojiAPI] 图片已保存到临时文件: {temp_file_path}")
except Exception as save_error:
logger.error(f"[EmojiAPI] 保存图片文件失败: {save_error}")
return {
"success": False,
"message": f"保存图片文件失败: {str(save_error)}",
"description": None,
"emotions": None,
"replaced": None,
"hash": None
}
# 6. 调用注册方法
register_success = await emoji_manager.register_emoji_by_filename(filename)
# 7. 清理临时文件(如果注册失败但文件还存在)
if not register_success and os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.debug(f"[EmojiAPI] 已清理临时文件: {temp_file_path}")
except Exception as cleanup_error:
logger.warning(f"[EmojiAPI] 清理临时文件失败: {cleanup_error}")
# 8. 构建返回结果
if register_success:
count_after = emoji_manager.emoji_num
replaced = count_after <= count_before # 如果数量没增加,说明是替换
# 尝试获取新注册的表情包信息
new_emoji_info = None
if count_after > count_before or replaced:
# 获取最新的表情包信息
try:
# 通过文件名查找新注册的表情包(注意:文件名在注册后可能已经改变)
for emoji_obj in reversed(emoji_manager.emoji_objects):
if not emoji_obj.is_deleted and (
emoji_obj.filename == filename or # 直接匹配
(hasattr(emoji_obj, 'full_path') and filename in emoji_obj.full_path) # 路径包含匹配
):
new_emoji_info = emoji_obj
break
except Exception as find_error:
logger.warning(f"[EmojiAPI] 查找新注册表情包信息失败: {find_error}")
description = new_emoji_info.description if new_emoji_info else None
emotions = new_emoji_info.emotion if new_emoji_info else None
emoji_hash = new_emoji_info.hash if new_emoji_info else None
return {
"success": True,
"message": f"表情包注册成功 {'(替换旧表情包)' if replaced else '(新增表情包)'}",
"description": description,
"emotions": emotions,
"replaced": replaced,
"hash": emoji_hash
}
else:
return {
"success": False,
"message": "表情包注册失败,可能因为重复、格式不支持或审核未通过",
"description": None,
"emotions": None,
"replaced": None,
"hash": None
}
except Exception as e:
logger.error(f"[EmojiAPI] 注册表情包时发生异常: {e}")
return {
"success": False,
"message": f"注册过程中发生错误: {str(e)}",
"description": None,
"emotions": None,
"replaced": None,
"hash": None
}