mirror of https://github.com/Mai-with-u/MaiBot.git
Merge branch 'dev' of github.com:MaiM-with-u/MaiBot into dev
commit
8ac892d764
|
|
@ -28,7 +28,7 @@ class AddEmojiCommand(BaseCommand):
|
|||
|
||||
async def execute(self) -> Tuple[bool, str, bool]:
|
||||
# 查找消息中的表情包
|
||||
logger.info(f"查找消息中的表情包: {self.message.message_segment}")
|
||||
# logger.info(f"查找消息中的表情包: {self.message.message_segment}")
|
||||
|
||||
emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment)
|
||||
|
||||
|
|
@ -42,8 +42,8 @@ class AddEmojiCommand(BaseCommand):
|
|||
|
||||
for i, emoji_base64 in enumerate(emoji_base64_list):
|
||||
try:
|
||||
# 使用emoji_api注册表情包
|
||||
result = await emoji_api.register_emoji(emoji_base64, filename=f"emoji_{i+1}")
|
||||
# 使用emoji_api注册表情包(让API自动生成唯一文件名)
|
||||
result = await emoji_api.register_emoji(emoji_base64)
|
||||
|
||||
if result["success"]:
|
||||
success_count += 1
|
||||
|
|
@ -72,13 +72,46 @@ class AddEmojiCommand(BaseCommand):
|
|||
summary_msg = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count} 个"
|
||||
|
||||
# 如果有结果详情,添加到返回消息中
|
||||
details_msg = ""
|
||||
if results:
|
||||
details_msg = "\n" + "\n".join(results)
|
||||
final_msg = summary_msg + details_msg
|
||||
else:
|
||||
final_msg = summary_msg
|
||||
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
# 使用表达器重写回复
|
||||
try:
|
||||
from src.plugin_system.apis import generator_api
|
||||
|
||||
# 构建重写数据
|
||||
rewrite_data = {
|
||||
"raw_reply": summary_msg,
|
||||
"reason": f"注册了表情包:{details_msg}\n",
|
||||
}
|
||||
|
||||
# 调用表达器重写
|
||||
result_status, data = await generator_api.rewrite_reply(
|
||||
chat_stream=self.message.chat_stream,
|
||||
reply_data=rewrite_data,
|
||||
)
|
||||
|
||||
if result_status:
|
||||
# 发送重写后的回复
|
||||
for reply_seg in data.reply_set.reply_data:
|
||||
send_data = reply_seg.content
|
||||
await self.send_text(send_data)
|
||||
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
else:
|
||||
# 如果重写失败,发送原始消息
|
||||
await self.send_text(final_msg)
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
|
||||
except Exception as e:
|
||||
# 如果表达器调用失败,发送原始消息
|
||||
logger.error(f"[add_emoji] 表达器重写失败: {e}")
|
||||
await self.send_text(final_msg)
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
|
||||
def find_and_return_emoji_in_message(self, message_segments) -> List[str]:
|
||||
emoji_base64_list = []
|
||||
|
|
@ -114,22 +147,205 @@ class ListEmojiCommand(BaseCommand):
|
|||
command_description = "列表表情包"
|
||||
|
||||
# === 命令设置(必须填写)===
|
||||
command_pattern = r"^/emoji list$" # 精确匹配 "/emoji list" 命令
|
||||
command_pattern = r"^/emoji list(\s+\d+)?$" # 匹配 "/emoji list" 或 "/emoji list 数量"
|
||||
|
||||
async def execute(self) -> Tuple[bool, str, bool]:
|
||||
"""执行列表表情包"""
|
||||
from src.plugin_system.apis import emoji_api
|
||||
import datetime
|
||||
|
||||
# 解析命令参数
|
||||
import re
|
||||
match = re.match(r"^/emoji list(?:\s+(\d+))?$", self.message.raw_message)
|
||||
max_count = 10 # 默认显示10个
|
||||
if match and match.group(1):
|
||||
max_count = min(int(match.group(1)), 50) # 最多显示50个
|
||||
|
||||
# 获取当前时间
|
||||
time_format: str = self.get_config("time.format", "%Y-%m-%d %H:%M:%S") # type: ignore
|
||||
now = datetime.datetime.now()
|
||||
time_str = now.strftime(time_format)
|
||||
|
||||
# 发送时间信息
|
||||
message = f"⏰ 当前时间:{time_str}"
|
||||
await self.send_text(message)
|
||||
# 获取表情包信息
|
||||
emoji_count = emoji_api.get_count()
|
||||
emoji_info = emoji_api.get_info()
|
||||
|
||||
return True, f"显示了当前时间: {time_str}", True
|
||||
# 构建返回消息
|
||||
message_lines = [
|
||||
f"📊 表情包统计信息 ({time_str})",
|
||||
f"• 总数: {emoji_count} / {emoji_info['max_count']}",
|
||||
f"• 可用: {emoji_info['available_emojis']}",
|
||||
]
|
||||
|
||||
if emoji_count == 0:
|
||||
message_lines.append("\n❌ 暂无表情包")
|
||||
final_message = "\n".join(message_lines)
|
||||
await self.send_text(final_message)
|
||||
return True, final_message, True
|
||||
|
||||
# 获取所有表情包
|
||||
all_emojis = await emoji_api.get_all()
|
||||
if not all_emojis:
|
||||
message_lines.append("\n❌ 无法获取表情包列表")
|
||||
final_message = "\n".join(message_lines)
|
||||
await self.send_text(final_message)
|
||||
return False, final_message, True
|
||||
|
||||
# 显示前N个表情包
|
||||
display_emojis = all_emojis[:max_count]
|
||||
message_lines.append(f"\n📋 显示前 {len(display_emojis)} 个表情包:")
|
||||
|
||||
for i, (emoji_base64, description, emotion) in enumerate(display_emojis, 1):
|
||||
# 截断过长的描述
|
||||
short_desc = description[:50] + "..." if len(description) > 50 else description
|
||||
message_lines.append(f"{i}. {short_desc} [{emotion}]")
|
||||
|
||||
# 如果还有更多表情包,显示总数
|
||||
if len(all_emojis) > max_count:
|
||||
message_lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示")
|
||||
|
||||
final_message = "\n".join(message_lines)
|
||||
|
||||
# 直接发送文本消息
|
||||
await self.send_text(final_message)
|
||||
|
||||
return True, final_message, True
|
||||
|
||||
|
||||
class DeleteEmojiCommand(BaseCommand):
|
||||
command_name = "delete_emoji"
|
||||
command_description = "删除表情包"
|
||||
command_pattern = r".*/emoji delete.*"
|
||||
|
||||
async def execute(self) -> Tuple[bool, str, bool]:
|
||||
# 查找消息中的表情包图片
|
||||
logger.info(f"查找消息中的表情包用于删除: {self.message.message_segment}")
|
||||
|
||||
emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment)
|
||||
|
||||
if not emoji_base64_list:
|
||||
return False, "未在消息中找到表情包或图片", False
|
||||
|
||||
# 删除找到的表情包
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
results = []
|
||||
|
||||
for i, emoji_base64 in enumerate(emoji_base64_list):
|
||||
try:
|
||||
# 计算图片的哈希值来查找对应的表情包
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
# 确保base64字符串只包含ASCII字符
|
||||
if isinstance(emoji_base64, str):
|
||||
emoji_base64_clean = emoji_base64.encode("ascii", errors="ignore").decode("ascii")
|
||||
else:
|
||||
emoji_base64_clean = str(emoji_base64)
|
||||
|
||||
# 计算哈希值
|
||||
image_bytes = base64.b64decode(emoji_base64_clean)
|
||||
emoji_hash = hashlib.md5(image_bytes).hexdigest()
|
||||
|
||||
# 使用emoji_api删除表情包
|
||||
result = await emoji_api.delete_emoji(emoji_hash)
|
||||
|
||||
if result["success"]:
|
||||
success_count += 1
|
||||
description = result.get("description", "未知描述")
|
||||
count_before = result.get("count_before", 0)
|
||||
count_after = result.get("count_after", 0)
|
||||
emotions = result.get("emotions", [])
|
||||
|
||||
result_msg = f"表情包 {i+1} 删除成功"
|
||||
if description:
|
||||
result_msg += f"\n描述: {description}"
|
||||
if emotions:
|
||||
result_msg += f"\n情感标签: {', '.join(emotions)}"
|
||||
result_msg += f"\n表情包数量: {count_before} → {count_after}"
|
||||
|
||||
results.append(result_msg)
|
||||
else:
|
||||
fail_count += 1
|
||||
error_msg = result.get("message", "删除失败")
|
||||
results.append(f"表情包 {i+1} 删除失败: {error_msg}")
|
||||
|
||||
except Exception as e:
|
||||
fail_count += 1
|
||||
results.append(f"表情包 {i+1} 删除时发生错误: {str(e)}")
|
||||
|
||||
# 构建返回消息
|
||||
total_count = success_count + fail_count
|
||||
summary_msg = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count} 个"
|
||||
|
||||
# 如果有结果详情,添加到返回消息中
|
||||
details_msg = ""
|
||||
if results:
|
||||
details_msg = "\n" + "\n".join(results)
|
||||
final_msg = summary_msg + details_msg
|
||||
else:
|
||||
final_msg = summary_msg
|
||||
|
||||
# 使用表达器重写回复
|
||||
try:
|
||||
from src.plugin_system.apis import generator_api
|
||||
|
||||
# 构建重写数据
|
||||
rewrite_data = {
|
||||
"raw_reply": summary_msg,
|
||||
"reason": f"删除了表情包:{details_msg}\n",
|
||||
}
|
||||
|
||||
# 调用表达器重写
|
||||
result_status, data = await generator_api.rewrite_reply(
|
||||
chat_stream=self.message.chat_stream,
|
||||
reply_data=rewrite_data,
|
||||
)
|
||||
|
||||
if result_status:
|
||||
# 发送重写后的回复
|
||||
for reply_seg in data.reply_set.reply_data:
|
||||
send_data = reply_seg.content
|
||||
await self.send_text(send_data)
|
||||
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
else:
|
||||
# 如果重写失败,发送原始消息
|
||||
await self.send_text(final_msg)
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
|
||||
except Exception as e:
|
||||
# 如果表达器调用失败,发送原始消息
|
||||
logger.error(f"[delete_emoji] 表达器重写失败: {e}")
|
||||
await self.send_text(final_msg)
|
||||
return success_count > 0, final_msg, success_count > 0
|
||||
|
||||
def find_and_return_emoji_in_message(self, message_segments) -> List[str]:
|
||||
emoji_base64_list = []
|
||||
|
||||
# 处理单个Seg对象的情况
|
||||
if isinstance(message_segments, Seg):
|
||||
if message_segments.type == "emoji":
|
||||
emoji_base64_list.append(message_segments.data)
|
||||
elif message_segments.type == "image":
|
||||
# 假设图片数据是base64编码的
|
||||
emoji_base64_list.append(message_segments.data)
|
||||
elif message_segments.type == "seglist":
|
||||
# 递归处理嵌套的Seg列表
|
||||
emoji_base64_list.extend(self.find_and_return_emoji_in_message(message_segments.data))
|
||||
return emoji_base64_list
|
||||
|
||||
# 处理Seg列表的情况
|
||||
for seg in message_segments:
|
||||
if seg.type == "emoji":
|
||||
emoji_base64_list.append(seg.data)
|
||||
elif seg.type == "image":
|
||||
# 假设图片数据是base64编码的
|
||||
emoji_base64_list.append(seg.data)
|
||||
elif seg.type == "seglist":
|
||||
# 递归处理嵌套的Seg列表
|
||||
emoji_base64_list.extend(self.find_and_return_emoji_in_message(seg.data))
|
||||
return emoji_base64_list
|
||||
|
||||
|
||||
class RandomEmojis(BaseCommand):
|
||||
|
|
@ -183,4 +399,6 @@ class EmojiManagePlugin(BasePlugin):
|
|||
return [
|
||||
(RandomEmojis.get_command_info(), RandomEmojis),
|
||||
(AddEmojiCommand.get_command_info(), AddEmojiCommand),
|
||||
(ListEmojiCommand.get_command_info(), ListEmojiCommand),
|
||||
(DeleteEmojiCommand.get_command_info(), DeleteEmojiCommand),
|
||||
]
|
||||
|
|
@ -366,17 +366,72 @@ async def register_emoji(image_base64: str, filename: Optional[str] = None) -> D
|
|||
|
||||
# 4. 生成文件名
|
||||
if not filename:
|
||||
# 基于时间戳和UUID生成唯一文件名
|
||||
# 基于时间戳、微秒和短base64生成唯一文件名
|
||||
import time
|
||||
timestamp = int(time.time())
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
filename = f"emoji_{timestamp}_{unique_id}"
|
||||
microseconds = int(time.time() * 1000000) % 1000000 # 添加微秒级精度
|
||||
|
||||
# 生成12位随机标识符,使用base64编码(增加随机性)
|
||||
import random
|
||||
random_bytes = random.getrandbits(72).to_bytes(9, 'big') # 72位 = 9字节 = 12位base64
|
||||
short_id = base64.b64encode(random_bytes).decode('ascii')[:12].rstrip('=')
|
||||
# 确保base64编码适合文件名(替换/和-)
|
||||
short_id = short_id.replace('/', '_').replace('+', '-')
|
||||
filename = f"emoji_{timestamp}_{microseconds}_{short_id}"
|
||||
|
||||
# 确保文件名有扩展名
|
||||
if not filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
|
||||
filename = f"{filename}.png" # 默认使用png格式
|
||||
|
||||
# 5. 保存base64图片到emoji目录
|
||||
# 检查文件名是否已存在,如果存在则重新生成短标识符
|
||||
temp_file_path = os.path.join(EMOJI_DIR, filename)
|
||||
attempts = 0
|
||||
max_attempts = 10
|
||||
while os.path.exists(temp_file_path) and attempts < max_attempts:
|
||||
# 重新生成短标识符
|
||||
import random
|
||||
random_bytes = random.getrandbits(48).to_bytes(6, 'big')
|
||||
short_id = base64.b64encode(random_bytes).decode('ascii')[:8].rstrip('=')
|
||||
short_id = short_id.replace('/', '_').replace('+', '-')
|
||||
|
||||
# 分离文件名和扩展名,重新生成文件名
|
||||
name_part, ext = os.path.splitext(filename)
|
||||
# 去掉原来的标识符,添加新的
|
||||
base_name = name_part.rsplit('_', 1)[0] # 移除最后一个_后的部分
|
||||
filename = f"{base_name}_{short_id}{ext}"
|
||||
temp_file_path = os.path.join(EMOJI_DIR, filename)
|
||||
attempts += 1
|
||||
|
||||
# 如果还是冲突,使用UUID作为备用方案
|
||||
if os.path.exists(temp_file_path):
|
||||
uuid_short = str(uuid.uuid4())[:8]
|
||||
name_part, ext = os.path.splitext(filename)
|
||||
base_name = name_part.rsplit('_', 1)[0]
|
||||
filename = f"{base_name}_{uuid_short}{ext}"
|
||||
temp_file_path = os.path.join(EMOJI_DIR, filename)
|
||||
|
||||
# 如果UUID方案也冲突,添加序号
|
||||
counter = 1
|
||||
original_filename = filename
|
||||
while os.path.exists(temp_file_path):
|
||||
name_part, ext = os.path.splitext(original_filename)
|
||||
filename = f"{name_part}_{counter}{ext}"
|
||||
temp_file_path = os.path.join(EMOJI_DIR, filename)
|
||||
counter += 1
|
||||
|
||||
# 防止无限循环,最多尝试100次
|
||||
if counter > 100:
|
||||
logger.error(f"[EmojiAPI] 无法生成唯一文件名,尝试次数过多: {original_filename}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": "无法生成唯一文件名,请稍后重试",
|
||||
"description": None,
|
||||
"emotions": None,
|
||||
"replaced": None,
|
||||
"hash": None
|
||||
}
|
||||
|
||||
# 5. 保存base64图片到emoji目录
|
||||
|
||||
try:
|
||||
# 解码base64并保存图片
|
||||
|
|
@ -468,3 +523,181 @@ async def register_emoji(image_base64: str, filename: Optional[str] = None) -> D
|
|||
"replaced": None,
|
||||
"hash": None
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 表情包删除API函数
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def delete_emoji(emoji_hash: str) -> Dict[str, Any]:
|
||||
"""删除表情包
|
||||
|
||||
Args:
|
||||
emoji_hash: 要删除的表情包的哈希值
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 删除结果,包含以下字段:
|
||||
- success: bool, 是否成功删除
|
||||
- message: str, 结果消息
|
||||
- count_before: Optional[int], 删除前的表情包数量
|
||||
- count_after: Optional[int], 删除后的表情包数量
|
||||
- description: Optional[str], 被删除的表情包描述(成功时)
|
||||
- emotions: Optional[List[str]], 被删除的表情包情感标签(成功时)
|
||||
|
||||
Raises:
|
||||
ValueError: 如果哈希值为空
|
||||
TypeError: 如果哈希值不是字符串类型
|
||||
"""
|
||||
if not emoji_hash:
|
||||
raise ValueError("表情包哈希值不能为空")
|
||||
if not isinstance(emoji_hash, str):
|
||||
raise TypeError("emoji_hash必须是字符串类型")
|
||||
|
||||
try:
|
||||
logger.info(f"[EmojiAPI] 开始删除表情包,哈希值: {emoji_hash}")
|
||||
|
||||
# 1. 获取emoji管理器和删除前的数量
|
||||
emoji_manager = get_emoji_manager()
|
||||
count_before = emoji_manager.emoji_num
|
||||
|
||||
# 2. 获取被删除表情包的信息(用于返回结果)
|
||||
try:
|
||||
deleted_emoji = await emoji_manager.get_emoji_from_manager(emoji_hash)
|
||||
description = deleted_emoji.description if deleted_emoji else None
|
||||
emotions = deleted_emoji.emotion if deleted_emoji else None
|
||||
except Exception as info_error:
|
||||
logger.warning(f"[EmojiAPI] 获取被删除表情包信息失败: {info_error}")
|
||||
description = None
|
||||
emotions = None
|
||||
|
||||
# 3. 执行删除操作
|
||||
delete_success = await emoji_manager.delete_emoji(emoji_hash)
|
||||
|
||||
# 4. 获取删除后的数量
|
||||
count_after = emoji_manager.emoji_num
|
||||
|
||||
# 5. 构建返回结果
|
||||
if delete_success:
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"表情包删除成功 (哈希: {emoji_hash[:8]}...)",
|
||||
"count_before": count_before,
|
||||
"count_after": count_after,
|
||||
"description": description,
|
||||
"emotions": emotions
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"表情包删除失败,可能因为哈希值不存在或删除过程出错",
|
||||
"count_before": count_before,
|
||||
"count_after": count_after,
|
||||
"description": None,
|
||||
"emotions": None
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[EmojiAPI] 删除表情包时发生异常: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"删除过程中发生错误: {str(e)}",
|
||||
"count_before": None,
|
||||
"count_after": None,
|
||||
"description": None,
|
||||
"emotions": None
|
||||
}
|
||||
|
||||
|
||||
async def delete_emoji_by_description(description: str, exact_match: bool = False) -> Dict[str, Any]:
|
||||
"""根据描述删除表情包
|
||||
|
||||
Args:
|
||||
description: 表情包描述文本
|
||||
exact_match: 是否精确匹配描述,False则为模糊匹配
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 删除结果,包含以下字段:
|
||||
- success: bool, 是否成功删除
|
||||
- message: str, 结果消息
|
||||
- deleted_count: int, 删除的表情包数量
|
||||
- deleted_hashes: List[str], 被删除的表情包哈希列表
|
||||
- matched_count: int, 匹配到的表情包数量
|
||||
|
||||
Raises:
|
||||
ValueError: 如果描述为空
|
||||
TypeError: 如果描述不是字符串类型
|
||||
"""
|
||||
if not description:
|
||||
raise ValueError("描述不能为空")
|
||||
if not isinstance(description, str):
|
||||
raise TypeError("description必须是字符串类型")
|
||||
|
||||
try:
|
||||
logger.info(f"[EmojiAPI] 根据描述删除表情包: {description} (精确匹配: {exact_match})")
|
||||
|
||||
emoji_manager = get_emoji_manager()
|
||||
all_emojis = emoji_manager.emoji_objects
|
||||
|
||||
# 筛选匹配的表情包
|
||||
matching_emojis = []
|
||||
for emoji_obj in all_emojis:
|
||||
if emoji_obj.is_deleted:
|
||||
continue
|
||||
|
||||
if exact_match:
|
||||
if emoji_obj.description == description:
|
||||
matching_emojis.append(emoji_obj)
|
||||
else:
|
||||
if description.lower() in emoji_obj.description.lower():
|
||||
matching_emojis.append(emoji_obj)
|
||||
|
||||
matched_count = len(matching_emojis)
|
||||
if matched_count == 0:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"未找到匹配描述 '{description}' 的表情包",
|
||||
"deleted_count": 0,
|
||||
"deleted_hashes": [],
|
||||
"matched_count": 0
|
||||
}
|
||||
|
||||
# 删除匹配的表情包
|
||||
deleted_count = 0
|
||||
deleted_hashes = []
|
||||
for emoji_obj in matching_emojis:
|
||||
try:
|
||||
delete_success = await emoji_manager.delete_emoji(emoji_obj.hash)
|
||||
if delete_success:
|
||||
deleted_count += 1
|
||||
deleted_hashes.append(emoji_obj.hash)
|
||||
except Exception as delete_error:
|
||||
logger.error(f"[EmojiAPI] 删除表情包失败 (哈希: {emoji_obj.hash}): {delete_error}")
|
||||
|
||||
# 构建返回结果
|
||||
if deleted_count > 0:
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"成功删除 {deleted_count} 个表情包 (匹配到 {matched_count} 个)",
|
||||
"deleted_count": deleted_count,
|
||||
"deleted_hashes": deleted_hashes,
|
||||
"matched_count": matched_count
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"匹配到 {matched_count} 个表情包,但删除全部失败",
|
||||
"deleted_count": 0,
|
||||
"deleted_hashes": [],
|
||||
"matched_count": matched_count
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[EmojiAPI] 根据描述删除表情包时发生异常: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"删除过程中发生错误: {str(e)}",
|
||||
"deleted_count": 0,
|
||||
"deleted_hashes": [],
|
||||
"matched_count": 0
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue