From 1c37dd46e86e86914342760767485caed4a773a8 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 24 Sep 2025 18:14:23 +0800 Subject: [PATCH] =?UTF-8?q?better=EF=BC=9A=E5=AE=8C=E5=96=84=E8=A1=A8?= =?UTF-8?q?=E6=83=85=E5=8C=85=E7=AE=A1=E7=90=86=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复一些问题 Update emoji_api.py --- plugins/emoji_manage_plugin/plugin.py | 236 ++++++++++++++++++++++++- src/plugin_system/apis/emoji_api.py | 241 +++++++++++++++++++++++++- 2 files changed, 464 insertions(+), 13 deletions(-) diff --git a/plugins/emoji_manage_plugin/plugin.py b/plugins/emoji_manage_plugin/plugin.py index b042cabb..893d14b7 100644 --- a/plugins/emoji_manage_plugin/plugin.py +++ b/plugins/emoji_manage_plugin/plugin.py @@ -28,7 +28,7 @@ class AddEmojiCommand(BaseCommand): async def execute(self) -> Tuple[bool, str, bool]: # 查找消息中的表情包 - logger.info(f"查找消息中的表情包: {self.message.message_segment}") + # logger.info(f"查找消息中的表情包: {self.message.message_segment}") emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment) @@ -42,8 +42,8 @@ class AddEmojiCommand(BaseCommand): for i, emoji_base64 in enumerate(emoji_base64_list): try: - # 使用emoji_api注册表情包 - result = await emoji_api.register_emoji(emoji_base64, filename=f"emoji_{i+1}") + # 使用emoji_api注册表情包(让API自动生成唯一文件名) + result = await emoji_api.register_emoji(emoji_base64) if result["success"]: success_count += 1 @@ -72,13 +72,46 @@ class AddEmojiCommand(BaseCommand): summary_msg = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count} 个" # 如果有结果详情,添加到返回消息中 + details_msg = "" if results: details_msg = "\n" + "\n".join(results) final_msg = summary_msg + details_msg else: final_msg = summary_msg - return success_count > 0, final_msg, success_count > 0 + # 使用表达器重写回复 + try: + from src.plugin_system.apis import generator_api + + # 构建重写数据 + rewrite_data = { + "raw_reply": summary_msg, + "reason": f"注册了表情包:{details_msg}\n", + } + + # 调用表达器重写 + result_status, data = await generator_api.rewrite_reply( + chat_stream=self.message.chat_stream, + reply_data=rewrite_data, + ) + + if result_status: + # 发送重写后的回复 + for reply_seg in data.reply_set.reply_data: + send_data = reply_seg.content + await self.send_text(send_data) + + return success_count > 0, final_msg, success_count > 0 + else: + # 如果重写失败,发送原始消息 + await self.send_text(final_msg) + return success_count > 0, final_msg, success_count > 0 + + except Exception as e: + # 如果表达器调用失败,发送原始消息 + logger.error(f"[add_emoji] 表达器重写失败: {e}") + await self.send_text(final_msg) + return success_count > 0, final_msg, success_count > 0 def find_and_return_emoji_in_message(self, message_segments) -> List[str]: emoji_base64_list = [] @@ -114,22 +147,205 @@ class ListEmojiCommand(BaseCommand): command_description = "列表表情包" # === 命令设置(必须填写)=== - command_pattern = r"^/emoji list$" # 精确匹配 "/emoji list" 命令 + command_pattern = r"^/emoji list(\s+\d+)?$" # 匹配 "/emoji list" 或 "/emoji list 数量" async def execute(self) -> Tuple[bool, str, bool]: """执行列表表情包""" + from src.plugin_system.apis import emoji_api import datetime + # 解析命令参数 + import re + match = re.match(r"^/emoji list(?:\s+(\d+))?$", self.message.raw_message) + max_count = 10 # 默认显示10个 + if match and match.group(1): + max_count = min(int(match.group(1)), 50) # 最多显示50个 + # 获取当前时间 time_format: str = self.get_config("time.format", "%Y-%m-%d %H:%M:%S") # type: ignore now = datetime.datetime.now() time_str = now.strftime(time_format) - # 发送时间信息 - message = f"⏰ 当前时间:{time_str}" - await self.send_text(message) + # 获取表情包信息 + emoji_count = emoji_api.get_count() + emoji_info = emoji_api.get_info() - return True, f"显示了当前时间: {time_str}", True + # 构建返回消息 + message_lines = [ + f"📊 表情包统计信息 ({time_str})", + f"• 总数: {emoji_count} / {emoji_info['max_count']}", + f"• 可用: {emoji_info['available_emojis']}", + ] + + if emoji_count == 0: + message_lines.append("\n❌ 暂无表情包") + final_message = "\n".join(message_lines) + await self.send_text(final_message) + return True, final_message, True + + # 获取所有表情包 + all_emojis = await emoji_api.get_all() + if not all_emojis: + message_lines.append("\n❌ 无法获取表情包列表") + final_message = "\n".join(message_lines) + await self.send_text(final_message) + return False, final_message, True + + # 显示前N个表情包 + display_emojis = all_emojis[:max_count] + message_lines.append(f"\n📋 显示前 {len(display_emojis)} 个表情包:") + + for i, (emoji_base64, description, emotion) in enumerate(display_emojis, 1): + # 截断过长的描述 + short_desc = description[:50] + "..." if len(description) > 50 else description + message_lines.append(f"{i}. {short_desc} [{emotion}]") + + # 如果还有更多表情包,显示总数 + if len(all_emojis) > max_count: + message_lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示") + + final_message = "\n".join(message_lines) + + # 直接发送文本消息 + await self.send_text(final_message) + + return True, final_message, True + + +class DeleteEmojiCommand(BaseCommand): + command_name = "delete_emoji" + command_description = "删除表情包" + command_pattern = r".*/emoji delete.*" + + async def execute(self) -> Tuple[bool, str, bool]: + # 查找消息中的表情包图片 + logger.info(f"查找消息中的表情包用于删除: {self.message.message_segment}") + + emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment) + + if not emoji_base64_list: + return False, "未在消息中找到表情包或图片", False + + # 删除找到的表情包 + success_count = 0 + fail_count = 0 + results = [] + + for i, emoji_base64 in enumerate(emoji_base64_list): + try: + # 计算图片的哈希值来查找对应的表情包 + import base64 + import hashlib + + # 确保base64字符串只包含ASCII字符 + if isinstance(emoji_base64, str): + emoji_base64_clean = emoji_base64.encode("ascii", errors="ignore").decode("ascii") + else: + emoji_base64_clean = str(emoji_base64) + + # 计算哈希值 + image_bytes = base64.b64decode(emoji_base64_clean) + emoji_hash = hashlib.md5(image_bytes).hexdigest() + + # 使用emoji_api删除表情包 + result = await emoji_api.delete_emoji(emoji_hash) + + if result["success"]: + success_count += 1 + description = result.get("description", "未知描述") + count_before = result.get("count_before", 0) + count_after = result.get("count_after", 0) + emotions = result.get("emotions", []) + + result_msg = f"表情包 {i+1} 删除成功" + if description: + result_msg += f"\n描述: {description}" + if emotions: + result_msg += f"\n情感标签: {', '.join(emotions)}" + result_msg += f"\n表情包数量: {count_before} → {count_after}" + + results.append(result_msg) + else: + fail_count += 1 + error_msg = result.get("message", "删除失败") + results.append(f"表情包 {i+1} 删除失败: {error_msg}") + + except Exception as e: + fail_count += 1 + results.append(f"表情包 {i+1} 删除时发生错误: {str(e)}") + + # 构建返回消息 + total_count = success_count + fail_count + summary_msg = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count} 个" + + # 如果有结果详情,添加到返回消息中 + details_msg = "" + if results: + details_msg = "\n" + "\n".join(results) + final_msg = summary_msg + details_msg + else: + final_msg = summary_msg + + # 使用表达器重写回复 + try: + from src.plugin_system.apis import generator_api + + # 构建重写数据 + rewrite_data = { + "raw_reply": summary_msg, + "reason": f"删除了表情包:{details_msg}\n", + } + + # 调用表达器重写 + result_status, data = await generator_api.rewrite_reply( + chat_stream=self.message.chat_stream, + reply_data=rewrite_data, + ) + + if result_status: + # 发送重写后的回复 + for reply_seg in data.reply_set.reply_data: + send_data = reply_seg.content + await self.send_text(send_data) + + return success_count > 0, final_msg, success_count > 0 + else: + # 如果重写失败,发送原始消息 + await self.send_text(final_msg) + return success_count > 0, final_msg, success_count > 0 + + except Exception as e: + # 如果表达器调用失败,发送原始消息 + logger.error(f"[delete_emoji] 表达器重写失败: {e}") + await self.send_text(final_msg) + return success_count > 0, final_msg, success_count > 0 + + def find_and_return_emoji_in_message(self, message_segments) -> List[str]: + emoji_base64_list = [] + + # 处理单个Seg对象的情况 + if isinstance(message_segments, Seg): + if message_segments.type == "emoji": + emoji_base64_list.append(message_segments.data) + elif message_segments.type == "image": + # 假设图片数据是base64编码的 + emoji_base64_list.append(message_segments.data) + elif message_segments.type == "seglist": + # 递归处理嵌套的Seg列表 + emoji_base64_list.extend(self.find_and_return_emoji_in_message(message_segments.data)) + return emoji_base64_list + + # 处理Seg列表的情况 + for seg in message_segments: + if seg.type == "emoji": + emoji_base64_list.append(seg.data) + elif seg.type == "image": + # 假设图片数据是base64编码的 + emoji_base64_list.append(seg.data) + elif seg.type == "seglist": + # 递归处理嵌套的Seg列表 + emoji_base64_list.extend(self.find_and_return_emoji_in_message(seg.data)) + return emoji_base64_list class RandomEmojis(BaseCommand): @@ -183,4 +399,6 @@ class EmojiManagePlugin(BasePlugin): return [ (RandomEmojis.get_command_info(), RandomEmojis), (AddEmojiCommand.get_command_info(), AddEmojiCommand), + (ListEmojiCommand.get_command_info(), ListEmojiCommand), + (DeleteEmojiCommand.get_command_info(), DeleteEmojiCommand), ] \ No newline at end of file diff --git a/src/plugin_system/apis/emoji_api.py b/src/plugin_system/apis/emoji_api.py index d127aa6d..6d142a9e 100644 --- a/src/plugin_system/apis/emoji_api.py +++ b/src/plugin_system/apis/emoji_api.py @@ -366,17 +366,72 @@ async def register_emoji(image_base64: str, filename: Optional[str] = None) -> D # 4. 生成文件名 if not filename: - # 基于时间戳和UUID生成唯一文件名 + # 基于时间戳、微秒和短base64生成唯一文件名 + import time timestamp = int(time.time()) - unique_id = str(uuid.uuid4())[:8] - filename = f"emoji_{timestamp}_{unique_id}" + microseconds = int(time.time() * 1000000) % 1000000 # 添加微秒级精度 + + # 生成12位随机标识符,使用base64编码(增加随机性) + import random + random_bytes = random.getrandbits(72).to_bytes(9, 'big') # 72位 = 9字节 = 12位base64 + short_id = base64.b64encode(random_bytes).decode('ascii')[:12].rstrip('=') + # 确保base64编码适合文件名(替换/和-) + short_id = short_id.replace('/', '_').replace('+', '-') + filename = f"emoji_{timestamp}_{microseconds}_{short_id}" # 确保文件名有扩展名 if not filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): filename = f"{filename}.png" # 默认使用png格式 - # 5. 保存base64图片到emoji目录 + # 检查文件名是否已存在,如果存在则重新生成短标识符 temp_file_path = os.path.join(EMOJI_DIR, filename) + attempts = 0 + max_attempts = 10 + while os.path.exists(temp_file_path) and attempts < max_attempts: + # 重新生成短标识符 + import random + random_bytes = random.getrandbits(48).to_bytes(6, 'big') + short_id = base64.b64encode(random_bytes).decode('ascii')[:8].rstrip('=') + short_id = short_id.replace('/', '_').replace('+', '-') + + # 分离文件名和扩展名,重新生成文件名 + name_part, ext = os.path.splitext(filename) + # 去掉原来的标识符,添加新的 + base_name = name_part.rsplit('_', 1)[0] # 移除最后一个_后的部分 + filename = f"{base_name}_{short_id}{ext}" + temp_file_path = os.path.join(EMOJI_DIR, filename) + attempts += 1 + + # 如果还是冲突,使用UUID作为备用方案 + if os.path.exists(temp_file_path): + uuid_short = str(uuid.uuid4())[:8] + name_part, ext = os.path.splitext(filename) + base_name = name_part.rsplit('_', 1)[0] + filename = f"{base_name}_{uuid_short}{ext}" + temp_file_path = os.path.join(EMOJI_DIR, filename) + + # 如果UUID方案也冲突,添加序号 + counter = 1 + original_filename = filename + while os.path.exists(temp_file_path): + name_part, ext = os.path.splitext(original_filename) + filename = f"{name_part}_{counter}{ext}" + temp_file_path = os.path.join(EMOJI_DIR, filename) + counter += 1 + + # 防止无限循环,最多尝试100次 + if counter > 100: + logger.error(f"[EmojiAPI] 无法生成唯一文件名,尝试次数过多: {original_filename}") + return { + "success": False, + "message": "无法生成唯一文件名,请稍后重试", + "description": None, + "emotions": None, + "replaced": None, + "hash": None + } + + # 5. 保存base64图片到emoji目录 try: # 解码base64并保存图片 @@ -468,3 +523,181 @@ async def register_emoji(image_base64: str, filename: Optional[str] = None) -> D "replaced": None, "hash": None } + + +# ============================================================================= +# 表情包删除API函数 +# ============================================================================= + + +async def delete_emoji(emoji_hash: str) -> Dict[str, Any]: + """删除表情包 + + Args: + emoji_hash: 要删除的表情包的哈希值 + + Returns: + Dict[str, Any]: 删除结果,包含以下字段: + - success: bool, 是否成功删除 + - message: str, 结果消息 + - count_before: Optional[int], 删除前的表情包数量 + - count_after: Optional[int], 删除后的表情包数量 + - description: Optional[str], 被删除的表情包描述(成功时) + - emotions: Optional[List[str]], 被删除的表情包情感标签(成功时) + + Raises: + ValueError: 如果哈希值为空 + TypeError: 如果哈希值不是字符串类型 + """ + if not emoji_hash: + raise ValueError("表情包哈希值不能为空") + if not isinstance(emoji_hash, str): + raise TypeError("emoji_hash必须是字符串类型") + + try: + logger.info(f"[EmojiAPI] 开始删除表情包,哈希值: {emoji_hash}") + + # 1. 获取emoji管理器和删除前的数量 + emoji_manager = get_emoji_manager() + count_before = emoji_manager.emoji_num + + # 2. 获取被删除表情包的信息(用于返回结果) + try: + deleted_emoji = await emoji_manager.get_emoji_from_manager(emoji_hash) + description = deleted_emoji.description if deleted_emoji else None + emotions = deleted_emoji.emotion if deleted_emoji else None + except Exception as info_error: + logger.warning(f"[EmojiAPI] 获取被删除表情包信息失败: {info_error}") + description = None + emotions = None + + # 3. 执行删除操作 + delete_success = await emoji_manager.delete_emoji(emoji_hash) + + # 4. 获取删除后的数量 + count_after = emoji_manager.emoji_num + + # 5. 构建返回结果 + if delete_success: + return { + "success": True, + "message": f"表情包删除成功 (哈希: {emoji_hash[:8]}...)", + "count_before": count_before, + "count_after": count_after, + "description": description, + "emotions": emotions + } + else: + return { + "success": False, + "message": f"表情包删除失败,可能因为哈希值不存在或删除过程出错", + "count_before": count_before, + "count_after": count_after, + "description": None, + "emotions": None + } + + except Exception as e: + logger.error(f"[EmojiAPI] 删除表情包时发生异常: {e}") + return { + "success": False, + "message": f"删除过程中发生错误: {str(e)}", + "count_before": None, + "count_after": None, + "description": None, + "emotions": None + } + + +async def delete_emoji_by_description(description: str, exact_match: bool = False) -> Dict[str, Any]: + """根据描述删除表情包 + + Args: + description: 表情包描述文本 + exact_match: 是否精确匹配描述,False则为模糊匹配 + + Returns: + Dict[str, Any]: 删除结果,包含以下字段: + - success: bool, 是否成功删除 + - message: str, 结果消息 + - deleted_count: int, 删除的表情包数量 + - deleted_hashes: List[str], 被删除的表情包哈希列表 + - matched_count: int, 匹配到的表情包数量 + + Raises: + ValueError: 如果描述为空 + TypeError: 如果描述不是字符串类型 + """ + if not description: + raise ValueError("描述不能为空") + if not isinstance(description, str): + raise TypeError("description必须是字符串类型") + + try: + logger.info(f"[EmojiAPI] 根据描述删除表情包: {description} (精确匹配: {exact_match})") + + emoji_manager = get_emoji_manager() + all_emojis = emoji_manager.emoji_objects + + # 筛选匹配的表情包 + matching_emojis = [] + for emoji_obj in all_emojis: + if emoji_obj.is_deleted: + continue + + if exact_match: + if emoji_obj.description == description: + matching_emojis.append(emoji_obj) + else: + if description.lower() in emoji_obj.description.lower(): + matching_emojis.append(emoji_obj) + + matched_count = len(matching_emojis) + if matched_count == 0: + return { + "success": False, + "message": f"未找到匹配描述 '{description}' 的表情包", + "deleted_count": 0, + "deleted_hashes": [], + "matched_count": 0 + } + + # 删除匹配的表情包 + deleted_count = 0 + deleted_hashes = [] + for emoji_obj in matching_emojis: + try: + delete_success = await emoji_manager.delete_emoji(emoji_obj.hash) + if delete_success: + deleted_count += 1 + deleted_hashes.append(emoji_obj.hash) + except Exception as delete_error: + logger.error(f"[EmojiAPI] 删除表情包失败 (哈希: {emoji_obj.hash}): {delete_error}") + + # 构建返回结果 + if deleted_count > 0: + return { + "success": True, + "message": f"成功删除 {deleted_count} 个表情包 (匹配到 {matched_count} 个)", + "deleted_count": deleted_count, + "deleted_hashes": deleted_hashes, + "matched_count": matched_count + } + else: + return { + "success": False, + "message": f"匹配到 {matched_count} 个表情包,但删除全部失败", + "deleted_count": 0, + "deleted_hashes": [], + "matched_count": matched_count + } + + except Exception as e: + logger.error(f"[EmojiAPI] 根据描述删除表情包时发生异常: {e}") + return { + "success": False, + "message": f"删除过程中发生错误: {str(e)}", + "deleted_count": 0, + "deleted_hashes": [], + "matched_count": 0 + }