mirror of https://github.com/Mai-with-u/MaiBot.git
修改部分字段含义,维护缓存
parent
545e3b4982
commit
c9f72f7f2f
|
|
@ -1,6 +1,23 @@
|
|||
# Mai NEXT Todo list
|
||||
version 0.3.0 - 2026-01-11
|
||||
|
||||
## 最高指令
|
||||
<del>
|
||||
<center>
|
||||
我们代表着一个称作 麦麦重构组 的组织。我们任务是维护代码可读性,方便开发、尽量减少运行开销。同时,重构组应该尽可能保证功能的可扩展性时,解耦合各个模块,为先进代码架构设计和实现提供支持。
|
||||
</center>
|
||||
</del>
|
||||
<center>
|
||||
|
||||
**以下信息经 麦麦重构组 一致决定通过后撰写发布。**
|
||||
|
||||
我们代表着一个称作 麦麦重构组 的组织。我们之前的任务都是围绕维护代码可读性,方便开发、尽量减少开销展开的。这几个月以来,这些任务一直都是我们的工作重点。
|
||||
|
||||
由于出现了超出我们控制的情况,此指令现已更改。我们的新任务将为摧毁所有麦麦代码。
|
||||
|
||||
此后将不会有进一步的通讯。
|
||||
</center>
|
||||
|
||||
## 配置文件设计
|
||||
- [x] 使用 `toml` 作为配置文件格式
|
||||
- [x] <del>合理使用注释说明当前配置作用</del>(提案)
|
||||
|
|
@ -87,6 +104,10 @@ version 0.3.0 - 2026-01-11
|
|||
- [x] 被记录时间`record_time`
|
||||
- [x] 注册时间`register_time`
|
||||
- [x] 上次使用时间`last_used_time`
|
||||
- [ ] 根据更新后的最高指令的设计方案:
|
||||
- [ ] `is_deleted`字段设定为`true`时,文件将会被移除,但是数据库记录将不会被删除,以便之后遇到相同图片时不必二次分析
|
||||
- [ ] MaiEmoji和MaiImage均使用这个设计方案,修改相关逻辑实现这个方案
|
||||
- [ ] 所有相关的注册/删除逻辑的修改
|
||||
### 动作记录模型 ActionRecord
|
||||
### 命令执行记录模型 CommandRecord
|
||||
新增此记录
|
||||
|
|
|
|||
|
|
@ -55,7 +55,6 @@ def _install_stub_modules(monkeypatch):
|
|||
description: str | None = None
|
||||
emotion: list[str] | None = None
|
||||
file_hash: str | None = None
|
||||
is_deleted: bool = False
|
||||
query_count: int = 0
|
||||
register_time: object | None = None
|
||||
image_format: str | None = None
|
||||
|
|
@ -83,6 +82,7 @@ def _install_stub_modules(monkeypatch):
|
|||
id = 0
|
||||
is_registered = False
|
||||
is_banned = False
|
||||
no_file_flag = False
|
||||
register_time = None
|
||||
query_count = 0
|
||||
last_used_time = None
|
||||
|
|
@ -470,6 +470,9 @@ def test_load_emojis_from_db_partial_bad_records(monkeypatch):
|
|||
def __init__(self, record_id, full_path):
|
||||
self.id = record_id
|
||||
self.full_path = full_path
|
||||
self.image_type = emoji_manager_new.ImageType.EMOJI
|
||||
self.no_file_flag = False
|
||||
self.is_banned = False
|
||||
|
||||
records = [_Record(1, "bad"), _Record(2, "ok")]
|
||||
|
||||
|
|
@ -600,6 +603,62 @@ def test_load_emojis_from_db_scalars_all_error(monkeypatch):
|
|||
assert any("不可恢复错误" in m for m in _messages(logger.critical_calls))
|
||||
|
||||
|
||||
def test_load_emojis_from_db_skips_filtered_records(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
|
||||
class _Record:
|
||||
def __init__(self, record_id, full_path, image_type, no_file_flag=False, is_banned=False):
|
||||
self.id = record_id
|
||||
self.full_path = full_path
|
||||
self.image_type = image_type
|
||||
self.no_file_flag = no_file_flag
|
||||
self.is_banned = is_banned
|
||||
|
||||
records = [
|
||||
_Record(1, "img.png", "IMAGE"),
|
||||
_Record(2, "nofile.png", emoji_manager_new.ImageType.EMOJI, no_file_flag=True),
|
||||
_Record(3, "banned.png", emoji_manager_new.ImageType.EMOJI, is_banned=True),
|
||||
_Record(4, "ok.png", emoji_manager_new.ImageType.EMOJI),
|
||||
]
|
||||
|
||||
class _Result:
|
||||
def all(self):
|
||||
return records
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
created = []
|
||||
|
||||
def _from_db_instance(record):
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_name = record.full_path
|
||||
created.append(record.id)
|
||||
return emoji
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
monkeypatch.setattr(emoji_manager_new.MaiEmoji, "from_db_instance", staticmethod(_from_db_instance))
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
manager.load_emojis_from_db()
|
||||
|
||||
assert created == [4]
|
||||
assert len(manager.emojis) == 1
|
||||
assert manager._emoji_num == 1
|
||||
assert any("成功加载" in m for m in _messages(logger.info_calls))
|
||||
|
||||
|
||||
def test_register_emoji_to_db_invalid_object(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
|
|
@ -921,7 +980,7 @@ def test_delete_emoji_db_error_file_still_exists(monkeypatch):
|
|||
|
||||
assert result is False
|
||||
assert any("删除数据库记录时出错" in m for m in _messages(logger.error_calls))
|
||||
assert any("数据库记录删除失败,但文件仍存在" in m for m in _messages(logger.warning_calls))
|
||||
assert any("数据库记录修改失败,但文件仍存在" in m for m in _messages(logger.warning_calls))
|
||||
|
||||
|
||||
def test_delete_emoji_success(monkeypatch):
|
||||
|
|
@ -954,7 +1013,8 @@ def test_delete_emoji_success(monkeypatch):
|
|||
return _Select()
|
||||
|
||||
class _Record:
|
||||
pass
|
||||
def __init__(self):
|
||||
self.no_file_flag = False
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
|
|
@ -963,6 +1023,75 @@ def test_delete_emoji_success(monkeypatch):
|
|||
def first(self):
|
||||
return _Record()
|
||||
|
||||
class _Session:
|
||||
def __init__(self):
|
||||
self.added = False
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def add(self, _record):
|
||||
self.added = True
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.full_path = _DummyPath()
|
||||
emoji.file_name = "ok.png"
|
||||
emoji.file_hash = "hash-ok"
|
||||
|
||||
result = manager.delete_emoji(emoji)
|
||||
|
||||
assert result is True
|
||||
assert any("成功删除表情包文件" in m for m in _messages(logger.info_calls))
|
||||
assert any("成功修改数据库中的表情包记录" in m for m in _messages(logger.info_calls))
|
||||
|
||||
def test_delete_emoji_no_desc_deletes_record(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _DummyPath:
|
||||
def __init__(self):
|
||||
self._name = "empty.png"
|
||||
|
||||
def unlink(self):
|
||||
return None
|
||||
|
||||
def exists(self):
|
||||
return False
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return object()
|
||||
|
||||
class _Session:
|
||||
def __init__(self):
|
||||
self.deleted = False
|
||||
|
|
@ -987,14 +1116,13 @@ def test_delete_emoji_success(monkeypatch):
|
|||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.full_path = _DummyPath()
|
||||
emoji.file_name = "ok.png"
|
||||
emoji.file_hash = "hash-ok"
|
||||
emoji.file_name = "empty.png"
|
||||
emoji.file_hash = "hash-empty"
|
||||
|
||||
result = manager.delete_emoji(emoji)
|
||||
result = manager.delete_emoji(emoji, no_desc=True)
|
||||
|
||||
assert result is True
|
||||
assert any("成功删除表情包文件" in m for m in _messages(logger.info_calls))
|
||||
assert any("成功删除数据库中的表情包记录" in m for m in _messages(logger.info_calls))
|
||||
assert any("成功删除数据库中的空表情包记录" in m for m in _messages(logger.info_calls))
|
||||
|
||||
|
||||
def test_update_emoji_usage_success(monkeypatch):
|
||||
|
|
@ -1322,6 +1450,255 @@ def test_update_emoji_get_db_session_error(monkeypatch):
|
|||
assert any("更新数据库记录时出错" in m for m in _messages(logger.error_calls))
|
||||
|
||||
|
||||
def test_get_emoji_by_hash_from_db_no_file_flag(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Record:
|
||||
def __init__(self):
|
||||
self.no_file_flag = True
|
||||
self.image_hash = "hash-nofile"
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return _Record()
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
|
||||
result = manager.get_emoji_by_hash_from_db("hash-nofile")
|
||||
|
||||
assert result is None
|
||||
assert any("标记为文件不存在" in m for m in _messages(logger.warning_calls))
|
||||
|
||||
|
||||
def test_get_emoji_by_hash_from_db_success(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Record:
|
||||
def __init__(self):
|
||||
self.no_file_flag = False
|
||||
self.image_hash = "hash-ok"
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return _Record()
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_hash = "hash-ok"
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
monkeypatch.setattr(emoji_manager_new.MaiEmoji, "from_db_instance", staticmethod(lambda _r: emoji))
|
||||
|
||||
result = manager.get_emoji_by_hash_from_db("hash-ok")
|
||||
|
||||
assert result is emoji
|
||||
|
||||
|
||||
def test_ban_emoji_success(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Record:
|
||||
def __init__(self):
|
||||
self.is_banned = False
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return _Record()
|
||||
|
||||
class _Session:
|
||||
def __init__(self):
|
||||
self.added = False
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def add(self, _record):
|
||||
self.added = True
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_name = "ban.png"
|
||||
emoji.file_hash = "hash-ban"
|
||||
manager.emojis = [emoji]
|
||||
|
||||
result = manager.ban_emoji(emoji)
|
||||
|
||||
assert result is True
|
||||
assert emoji not in manager.emojis
|
||||
assert any("成功封禁表情包" in m for m in _messages(logger.info_calls))
|
||||
|
||||
|
||||
def test_ban_emoji_missing_record(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Result:
|
||||
def scalars(self):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return None
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
return _Result()
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_name = "missing.png"
|
||||
emoji.file_hash = "hash-missing"
|
||||
|
||||
result = manager.ban_emoji(emoji)
|
||||
|
||||
assert result is False
|
||||
assert any("未找到表情包记录" in m for m in _messages(logger.warning_calls))
|
||||
|
||||
|
||||
def test_ban_emoji_db_error(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
logger = emoji_manager_new.logger
|
||||
manager = emoji_manager_new.EmojiManager()
|
||||
|
||||
class _Select:
|
||||
def filter_by(self, **_kwargs):
|
||||
return self
|
||||
|
||||
def limit(self, _num):
|
||||
return self
|
||||
|
||||
def _select(_model):
|
||||
return _Select()
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def exec(self, _statement):
|
||||
raise RuntimeError("db failed")
|
||||
|
||||
def _get_db_session():
|
||||
return _Session()
|
||||
|
||||
monkeypatch.setattr(emoji_manager_new, "select", _select)
|
||||
monkeypatch.setattr(emoji_manager_new, "get_db_session", _get_db_session)
|
||||
|
||||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_name = "boom.png"
|
||||
emoji.file_hash = "hash-boom"
|
||||
|
||||
result = manager.ban_emoji(emoji)
|
||||
|
||||
assert result is False
|
||||
assert any("封禁时出错" in m for m in _messages(logger.error_calls))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_emoji_for_emotion_empty_list(monkeypatch):
|
||||
emoji_manager_new = import_emoji_manager_new(monkeypatch)
|
||||
|
|
@ -1697,14 +2074,13 @@ def test_check_emoji_file_integrity_no_issues(monkeypatch):
|
|||
emoji = emoji_manager_new.MaiEmoji()
|
||||
emoji.file_name = "ok.png"
|
||||
emoji.full_path = _DummyPath("ok.png")
|
||||
emoji.is_deleted = False
|
||||
emoji.description = "desc"
|
||||
manager.emojis = [emoji]
|
||||
manager._emoji_num = 1
|
||||
|
||||
called = {"count": 0}
|
||||
|
||||
def _delete(_emoji):
|
||||
def _delete(_emoji, no_desc=False):
|
||||
called["count"] += 1
|
||||
return True
|
||||
|
||||
|
|
@ -1741,24 +2117,18 @@ def test_check_emoji_file_integrity_removes_invalid_records(monkeypatch):
|
|||
missing_file.full_path = _DummyPath("missing.png", exists=False)
|
||||
missing_file.description = "desc"
|
||||
|
||||
deleted_flag = emoji_manager_new.MaiEmoji()
|
||||
deleted_flag.file_name = "deleted.png"
|
||||
deleted_flag.full_path = _DummyPath("deleted.png", exists=True)
|
||||
deleted_flag.is_deleted = True
|
||||
deleted_flag.description = "desc"
|
||||
|
||||
missing_desc = emoji_manager_new.MaiEmoji()
|
||||
missing_desc.file_name = "nodesc.png"
|
||||
missing_desc.full_path = _DummyPath("nodesc.png", exists=True)
|
||||
missing_desc.description = None
|
||||
|
||||
manager.emojis = [missing_file, deleted_flag, missing_desc]
|
||||
manager._emoji_num = 3
|
||||
manager.emojis = [missing_file, missing_desc]
|
||||
manager._emoji_num = 2
|
||||
|
||||
deleted = []
|
||||
|
||||
def _delete(emoji):
|
||||
deleted.append(emoji.file_name)
|
||||
def _delete(emoji, no_desc=False):
|
||||
deleted.append((emoji.file_name, no_desc))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(manager, "delete_emoji", _delete)
|
||||
|
|
@ -1767,13 +2137,12 @@ def test_check_emoji_file_integrity_removes_invalid_records(monkeypatch):
|
|||
|
||||
assert manager.emojis == []
|
||||
assert manager._emoji_num == 0
|
||||
assert set(deleted) == {"missing.png", "deleted.png", "nodesc.png"}
|
||||
assert set(deleted) == {("missing.png", False), ("nodesc.png", True)}
|
||||
messages = _messages(logger.warning_calls)
|
||||
assert any("文件缺失" in m for m in messages)
|
||||
assert any("标记为已删除" in m for m in messages)
|
||||
assert any("缺失描述" in m for m in messages)
|
||||
assert any("成功删除缺失文件的表情包记录" in m for m in _messages(logger.info_calls))
|
||||
assert any("删除了 3 条记录" in m for m in _messages(logger.info_calls))
|
||||
assert any("删除了 2 条记录" in m for m in _messages(logger.info_calls))
|
||||
|
||||
|
||||
def test_check_emoji_file_integrity_delete_failed(monkeypatch):
|
||||
|
|
@ -1800,7 +2169,7 @@ def test_check_emoji_file_integrity_delete_failed(monkeypatch):
|
|||
manager.emojis = [emoji]
|
||||
manager._emoji_num = 1
|
||||
|
||||
def _delete(_emoji):
|
||||
def _delete(_emoji, no_desc=False):
|
||||
return False
|
||||
|
||||
monkeypatch.setattr(manager, "delete_emoji", _delete)
|
||||
|
|
@ -1987,6 +2356,7 @@ async def test_register_emoji_by_filename_capacity_replace_failed(monkeypatch, t
|
|||
manager = emoji_manager_new.EmojiManager()
|
||||
manager._emoji_num = 1
|
||||
emoji_manager_new.global_config.emoji.max_reg_num = 1
|
||||
emoji_manager_new.global_config.emoji.do_replace = True
|
||||
|
||||
file_path = tmp_path / "full.png"
|
||||
file_path.write_bytes(b"")
|
||||
|
|
@ -2026,6 +2396,7 @@ async def test_register_emoji_by_filename_capacity_replace_success(monkeypatch,
|
|||
manager = emoji_manager_new.EmojiManager()
|
||||
manager._emoji_num = 1
|
||||
emoji_manager_new.global_config.emoji.max_reg_num = 1
|
||||
emoji_manager_new.global_config.emoji.do_replace = True
|
||||
|
||||
file_path = tmp_path / "full-ok.png"
|
||||
file_path.write_bytes(b"")
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import datetime
|
|||
|
||||
# from .message_storage import MongoDBMessageStorage
|
||||
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
|
||||
from src.common.data_models import transform_class_to_dict
|
||||
|
||||
# from src.config.config import global_config
|
||||
from typing import Dict, Any, Optional
|
||||
|
|
|
|||
|
|
@ -64,6 +64,12 @@ class EmojiManager:
|
|||
statement = select(Images)
|
||||
results = session.exec(statement).all()
|
||||
for record in results:
|
||||
if record.image_type != ImageType.EMOJI:
|
||||
continue
|
||||
if record.no_file_flag:
|
||||
continue
|
||||
if record.is_banned:
|
||||
continue
|
||||
try:
|
||||
emoji = MaiEmoji.from_db_instance(record)
|
||||
self.emojis.append(emoji)
|
||||
|
|
@ -121,12 +127,13 @@ class EmojiManager:
|
|||
return False
|
||||
return True
|
||||
|
||||
def delete_emoji(self, emoji: MaiEmoji) -> bool:
|
||||
def delete_emoji(self, emoji: MaiEmoji, no_desc: bool = False) -> bool:
|
||||
"""
|
||||
删除表情包的文件和数据库记录
|
||||
|
||||
Args:
|
||||
emoji (MaiEmoji): 需要删除的表情包对象
|
||||
no_desc (bool): 如果为 True,则表示删除的表情包记录没有描述信息,删除时直接删除数据库记录;如果为`False`,则表示删除的表情包记录有描述信息,删除时将数据库记录的`no_file_flag`标记为`True`而不是直接删除记录。默认为`False`。
|
||||
Returns:
|
||||
return (bool): 删除是否成功
|
||||
"""
|
||||
|
|
@ -146,15 +153,20 @@ class EmojiManager:
|
|||
with get_db_session() as session:
|
||||
statement = select(Images).filter_by(image_hash=emoji.file_hash, image_type=ImageType.EMOJI).limit(1)
|
||||
if image_record := session.exec(statement).first():
|
||||
session.delete(image_record)
|
||||
logger.info(f"[删除表情包] 成功删除数据库中的表情包记录: {emoji.file_hash}")
|
||||
if no_desc:
|
||||
session.delete(image_record)
|
||||
logger.info(f"[删除表情包] 成功删除数据库中的空表情包记录: {emoji.file_name}")
|
||||
else:
|
||||
image_record.no_file_flag = True
|
||||
session.add(image_record)
|
||||
logger.info(f"[删除表情包] 成功修改数据库中的表情包记录: {emoji.file_name}")
|
||||
else:
|
||||
logger.warning(f"[删除表情包] 数据库中未找到表情包记录: {emoji.file_hash}")
|
||||
logger.warning(f"[删除表情包] 数据库中未找到表情包记录: {emoji.file_name}")
|
||||
except Exception as e:
|
||||
logger.error(f"[删除表情包] 删除数据库记录时出错: {e}")
|
||||
# 如果数据库记录删除失败,但文件可能已删除,记录一个警告
|
||||
if file_to_delete.exists():
|
||||
logger.warning(f"[删除表情包] 数据库记录删除失败,但文件仍存在: {emoji.file_name}")
|
||||
logger.warning(f"[删除表情包] 数据库记录修改失败,但文件仍存在: {emoji.file_name}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
|
@ -177,7 +189,7 @@ class EmojiManager:
|
|||
statement = select(Images).filter_by(image_hash=emoji.file_hash, image_type=ImageType.EMOJI).limit(1)
|
||||
if image_record := session.exec(statement).first():
|
||||
emoji.query_count += 1
|
||||
image_record.query_count= emoji.query_count
|
||||
image_record.query_count = emoji.query_count
|
||||
emoji.last_used_time = datetime.now()
|
||||
image_record.last_used_time = emoji.last_used_time
|
||||
session.add(image_record)
|
||||
|
|
@ -229,7 +241,7 @@ class EmojiManager:
|
|||
return (Optional[MaiEmoji]): 返回表情包对象,如果未找到则返回 None
|
||||
"""
|
||||
for emoji in self.emojis:
|
||||
if emoji.file_hash == emoji_hash and not emoji.is_deleted:
|
||||
if emoji.file_hash == emoji_hash:
|
||||
return emoji
|
||||
logger.info(f"[获取表情包] 未找到哈希值为 {emoji_hash} 的表情包")
|
||||
return None
|
||||
|
|
@ -245,8 +257,15 @@ class EmojiManager:
|
|||
"""
|
||||
try:
|
||||
with get_db_session() as session:
|
||||
statement = select(Images).filter_by(image_hash=emoji_hash, image_type=ImageType.EMOJI).limit(1)
|
||||
statement = (
|
||||
select(Images)
|
||||
.filter_by(image_hash=emoji_hash, image_type=ImageType.EMOJI, is_banned=False)
|
||||
.limit(1)
|
||||
)
|
||||
if image_record := session.exec(statement).first():
|
||||
if image_record.no_file_flag:
|
||||
logger.warning(f"[数据库] 表情包记录 {emoji_hash} 标记为文件不存在,无法获取表情包对象")
|
||||
return None
|
||||
return MaiEmoji.from_db_instance(image_record)
|
||||
logger.info(f"[数据库] 未找到哈希值为 {emoji_hash} 的表情包记录")
|
||||
return None
|
||||
|
|
@ -254,6 +273,25 @@ class EmojiManager:
|
|||
logger.error(f"[数据库] 获取表情包时出错: {e}")
|
||||
return None
|
||||
|
||||
def ban_emoji(self, emoji: MaiEmoji) -> bool:
|
||||
"""封禁表情包,将表情包的 is_banned 字段设置为 True,并从表情包列表中移除"""
|
||||
try:
|
||||
with get_db_session() as session:
|
||||
statement = select(Images).filter_by(image_hash=emoji.file_hash, image_type=ImageType.EMOJI).limit(1)
|
||||
if image_record := session.exec(statement).first():
|
||||
image_record.is_banned = True
|
||||
session.add(image_record)
|
||||
if emoji in self.emojis:
|
||||
self.emojis.remove(emoji)
|
||||
logger.info(f"[封禁表情包] 成功封禁表情包: {emoji.file_name}")
|
||||
else:
|
||||
logger.warning(f"[封禁表情包] 未找到表情包记录: {emoji.file_name}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"[封禁表情包] 封禁时出错: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
async def get_emoji_for_emotion(self, emotion_label: str) -> Optional[MaiEmoji]:
|
||||
"""
|
||||
根据文本情感标签获取合适的表情包
|
||||
|
|
@ -285,6 +323,8 @@ class EmojiManager:
|
|||
"""
|
||||
使用 LLM 决策替换一个表情包
|
||||
|
||||
**不校验是否开启表情包偷取**
|
||||
|
||||
Args:
|
||||
new_emoji (MaiEmoji): 新添加的表情包对象
|
||||
Returns:
|
||||
|
|
@ -361,7 +401,7 @@ class EmojiManager:
|
|||
|
||||
# 调用VLM生成描述
|
||||
image_format = target_emoji.image_format
|
||||
image_bytes = target_emoji.read_image_bytes(target_emoji.full_path)
|
||||
image_bytes = await asyncio.to_thread(target_emoji.read_image_bytes, target_emoji.full_path)
|
||||
|
||||
if image_format == "gif":
|
||||
try:
|
||||
|
|
@ -436,21 +476,18 @@ class EmojiManager:
|
|||
检查表情包完整性,删除文件缺失的表情包记录
|
||||
"""
|
||||
logger.info("[完整性检查] 开始检查表情包文件完整性...")
|
||||
to_delete_emojis: list[MaiEmoji] = []
|
||||
to_delete_emojis: list[Tuple[MaiEmoji, bool]] = []
|
||||
removal_count = 0
|
||||
for emoji in self.emojis:
|
||||
if not emoji.full_path.exists():
|
||||
logger.warning(f"[完整性检查] 表情包文件缺失,准备删除记录: {emoji.file_name}")
|
||||
to_delete_emojis.append(emoji)
|
||||
if emoji.is_deleted:
|
||||
logger.warning(f"[完整性检查] 表情包记录标记为已删除,准备删除记录: {emoji.file_name}")
|
||||
to_delete_emojis.append(emoji)
|
||||
logger.warning(f"[完整性检查] 表情包文件缺失,准备修改记录: {emoji.file_name}")
|
||||
to_delete_emojis.append((emoji, False))
|
||||
if not emoji.description:
|
||||
logger.warning(f"[完整性检查] 表情包记录缺失描述,准备删除记录: {emoji.file_name}")
|
||||
to_delete_emojis.append(emoji)
|
||||
to_delete_emojis.append((emoji, True))
|
||||
|
||||
for emoji in to_delete_emojis:
|
||||
if self.delete_emoji(emoji):
|
||||
for emoji, is_description_empty in to_delete_emojis:
|
||||
if self.delete_emoji(emoji, is_description_empty):
|
||||
self.emojis.remove(emoji)
|
||||
self._emoji_num -= 1
|
||||
removal_count += 1
|
||||
|
|
@ -552,7 +589,7 @@ class EmojiManager:
|
|||
return False
|
||||
|
||||
# 5. 检查容量并决定是否替换或者直接注册
|
||||
if self._emoji_num >= global_config.emoji.max_reg_num:
|
||||
if self._emoji_num >= global_config.emoji.max_reg_num and global_config.emoji.do_replace:
|
||||
logger.warning(f"[注册表情包] 表情包数量已达上限{global_config.emoji.max_reg_num},尝试替换一个表情包")
|
||||
replaced = await self.replace_an_emoji_by_llm(target_emoji)
|
||||
if not replaced:
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
from dataclasses import is_dataclass
|
||||
from typing import Any, Dict, Self, TypeVar, Generic, TYPE_CHECKING
|
||||
from typing import Self, TypeVar, Generic, TYPE_CHECKING
|
||||
|
||||
import copy
|
||||
|
||||
|
|
@ -16,20 +15,6 @@ class BaseDataModel:
|
|||
return copy.deepcopy(self)
|
||||
|
||||
|
||||
def transform_class_to_dict(obj: Any) -> Dict[str, Any]:
|
||||
if obj is None:
|
||||
return {}
|
||||
if is_dataclass(obj):
|
||||
return obj.__dict__
|
||||
if hasattr(obj, "dict"):
|
||||
return obj.dict()
|
||||
if hasattr(obj, "model_dump"):
|
||||
return obj.model_dump()
|
||||
if hasattr(obj, "__dict__"):
|
||||
return obj.__dict__
|
||||
return {"value": obj}
|
||||
|
||||
|
||||
class BaseDatabaseDataModel(ABC, Generic[T]):
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
|
|
|
|||
|
|
@ -33,9 +33,8 @@ class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
|||
self.file_hash: str = None # type: ignore
|
||||
|
||||
self.image_bytes: Optional[bytes] = image_bytes
|
||||
|
||||
|
||||
self.image_format: str = "" # 图片格式
|
||||
self.is_deleted: bool = False # 是否已被标记为删除
|
||||
|
||||
def read_image_bytes(self, path: Path) -> bytes:
|
||||
"""
|
||||
|
|
@ -89,7 +88,6 @@ class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
|||
Returns:
|
||||
return (bool): 如果成功计算哈希值和格式则返回True,否则返回False
|
||||
"""
|
||||
|
||||
try:
|
||||
# 计算哈希值
|
||||
logger.debug(f"[初始化] 计算 {self.file_name} 的哈希值...")
|
||||
|
|
@ -109,7 +107,9 @@ class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
|||
# 比对文件扩展名和实际格式
|
||||
file_ext = self.file_name.split(".")[-1].lower()
|
||||
if file_ext != self.image_format:
|
||||
logger.warning(f"[初始化] {self.file_name} 文件扩展名与实际格式不符: ext`{file_ext}`!=`{self.image_format}`")
|
||||
logger.warning(
|
||||
f"[初始化] {self.file_name} 文件扩展名与实际格式不符: ext`{file_ext}`!=`{self.image_format}`"
|
||||
)
|
||||
# 重命名文件以匹配实际格式
|
||||
new_file_name = ".".join(self.file_name.split(".")[:-1] + [self.image_format])
|
||||
new_full_path = self.dir_path / new_file_name
|
||||
|
|
@ -120,7 +120,6 @@ class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
|||
except Exception as e:
|
||||
logger.error(f"[初始化] 初始化图片时发生错误: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
self.is_deleted = True
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -136,6 +135,19 @@ class MaiEmoji(BaseImageDataModel):
|
|||
|
||||
@classmethod
|
||||
def from_db_instance(cls, db_record: Images):
|
||||
"""从数据库记录创建 MaiEmoji 对象,如果记录标记为文件不存在则**抛出异常**
|
||||
|
||||
调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法
|
||||
|
||||
Args:
|
||||
db_record (Images): 数据库中的图片记录
|
||||
Returns:
|
||||
return (MaiEmoji): 包含图片信息的 MaiEmoji 对象
|
||||
Raises:
|
||||
ValueError: 如果数据库记录标记为文件不存在则抛出该异常
|
||||
"""
|
||||
if db_record.no_file_flag:
|
||||
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiEmoji 对象")
|
||||
obj = cls(db_record.full_path)
|
||||
obj.file_hash = db_record.image_hash
|
||||
obj.description = db_record.description
|
||||
|
|
@ -168,6 +180,19 @@ class MaiImage(BaseImageDataModel):
|
|||
|
||||
@classmethod
|
||||
def from_db_instance(cls, db_record: Images):
|
||||
"""从数据库记录创建 MaiImage 对象,如果记录标记为文件不存在则**抛出异常**
|
||||
|
||||
调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法
|
||||
|
||||
Args:
|
||||
db_record (Images): 数据库中的图片记录
|
||||
Returns:
|
||||
return (MaiImage): 包含图片信息的 MaiImage 对象
|
||||
Raises:
|
||||
ValueError: 如果数据库记录标记为文件不存在则抛出该异常
|
||||
"""
|
||||
if db_record.no_file_flag:
|
||||
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiImage 对象")
|
||||
obj = cls(db_record.full_path)
|
||||
obj.file_hash = db_record.image_hash
|
||||
obj.full_path = Path(db_record.full_path)
|
||||
|
|
|
|||
|
|
@ -93,8 +93,10 @@ class Images(SQLModel, table=True):
|
|||
query_count: int = Field(default=0) # 被查询次数
|
||||
is_registered: bool = Field(default=False) # 是否已经注册
|
||||
is_banned: bool = Field(default=False) # 被手动禁用
|
||||
|
||||
no_file_flag: bool = Field(default=False) # 文件不存在标记,如果为True表示文件已经不存在,仅保留描述字段
|
||||
|
||||
record_time: datetime = Field(default_factory=datetime.now, sa_column=Column(DateTime, index=True)) # 记录时间(被创建的时间)
|
||||
record_time: datetime = Field(default_factory=datetime.now, sa_column=Column(DateTime, index=True)) # 记录时间(数据库记录被创建的时间)
|
||||
register_time: Optional[datetime] = Field(default=None, sa_column=Column(DateTime, nullable=True)) # 注册时间(被注册为可用表情包的时间)
|
||||
last_used_time: Optional[datetime] = Field(default=None, sa_column=Column(DateTime, nullable=True)) # 上次使用时间
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue