mirror of https://github.com/Mai-with-u/MaiBot.git
remove:移除部分兼容函数
parent
16ae212adc
commit
40fb50249f
|
|
@ -539,8 +539,6 @@ class ExpressionLearnerManager:
|
|||
self.expression_learners = {}
|
||||
|
||||
self._ensure_expression_directories()
|
||||
self._auto_migrate_json_to_db()
|
||||
self._migrate_old_data_create_date()
|
||||
|
||||
def get_expression_learner(self, chat_id: str) -> ExpressionLearner:
|
||||
if chat_id not in self.expression_learners:
|
||||
|
|
@ -565,189 +563,5 @@ class ExpressionLearnerManager:
|
|||
except Exception as e:
|
||||
logger.error(f"创建目录失败 {directory}: {e}")
|
||||
|
||||
def _auto_migrate_json_to_db(self):
|
||||
"""
|
||||
自动将/data/expression/learnt_style 和 learnt_grammar 下所有expressions.json迁移到数据库。
|
||||
迁移完成后在/data/expression/done.done写入标记文件,存在则跳过。
|
||||
然后检查done.done2,如果没有就删除所有grammar表达并创建该标记文件。
|
||||
"""
|
||||
base_dir = os.path.join("data", "expression")
|
||||
done_flag = os.path.join(base_dir, "done.done")
|
||||
done_flag2 = os.path.join(base_dir, "done.done2")
|
||||
|
||||
# 确保基础目录存在
|
||||
try:
|
||||
os.makedirs(base_dir, exist_ok=True)
|
||||
logger.debug(f"确保目录存在: {base_dir}")
|
||||
except Exception as e:
|
||||
logger.error(f"创建表达方式目录失败: {e}")
|
||||
return
|
||||
|
||||
if os.path.exists(done_flag):
|
||||
logger.info("表达方式JSON已迁移,无需重复迁移。")
|
||||
else:
|
||||
logger.info("开始迁移表达方式JSON到数据库...")
|
||||
migrated_count = 0
|
||||
|
||||
for type in ["learnt_style", "learnt_grammar"]:
|
||||
type_str = "style" if type == "learnt_style" else "grammar"
|
||||
type_dir = os.path.join(base_dir, type)
|
||||
if not os.path.exists(type_dir):
|
||||
logger.debug(f"目录不存在,跳过: {type_dir}")
|
||||
continue
|
||||
|
||||
try:
|
||||
chat_ids = os.listdir(type_dir)
|
||||
logger.debug(f"在 {type_dir} 中找到 {len(chat_ids)} 个聊天ID目录")
|
||||
except Exception as e:
|
||||
logger.error(f"读取目录失败 {type_dir}: {e}")
|
||||
continue
|
||||
|
||||
for chat_id in chat_ids:
|
||||
expr_file = os.path.join(type_dir, chat_id, "expressions.json")
|
||||
if not os.path.exists(expr_file):
|
||||
continue
|
||||
try:
|
||||
with open(expr_file, "r", encoding="utf-8") as f:
|
||||
expressions = json.load(f)
|
||||
|
||||
if not isinstance(expressions, list):
|
||||
logger.warning(f"表达方式文件格式错误,跳过: {expr_file}")
|
||||
continue
|
||||
|
||||
for expr in expressions:
|
||||
if not isinstance(expr, dict):
|
||||
continue
|
||||
|
||||
situation = expr.get("situation")
|
||||
style_val = expr.get("style")
|
||||
count = expr.get("count", 1)
|
||||
last_active_time = expr.get("last_active_time", time.time())
|
||||
|
||||
if not situation or not style_val:
|
||||
logger.warning(f"表达方式缺少必要字段,跳过: {expr}")
|
||||
continue
|
||||
|
||||
# 查重:同chat_id+type+situation+style
|
||||
from src.common.database.database_model import Expression
|
||||
|
||||
query = Expression.select().where(
|
||||
(Expression.chat_id == chat_id)
|
||||
& (Expression.type == type_str)
|
||||
& (Expression.situation == situation)
|
||||
& (Expression.style == style_val)
|
||||
)
|
||||
if query.exists():
|
||||
expr_obj = query.get()
|
||||
expr_obj.count = max(expr_obj.count, count)
|
||||
expr_obj.last_active_time = max(expr_obj.last_active_time, last_active_time)
|
||||
expr_obj.save()
|
||||
else:
|
||||
Expression.create(
|
||||
situation=situation,
|
||||
style=style_val,
|
||||
count=count,
|
||||
last_active_time=last_active_time,
|
||||
chat_id=chat_id,
|
||||
type=type_str,
|
||||
create_date=last_active_time, # 迁移时使用last_active_time作为创建时间
|
||||
)
|
||||
migrated_count += 1
|
||||
logger.info(f"已迁移 {expr_file} 到数据库,包含 {len(expressions)} 个表达方式")
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"JSON解析失败 {expr_file}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"迁移表达方式 {expr_file} 失败: {e}")
|
||||
|
||||
# 标记迁移完成
|
||||
try:
|
||||
# 确保done.done文件的父目录存在
|
||||
done_parent_dir = os.path.dirname(done_flag)
|
||||
if not os.path.exists(done_parent_dir):
|
||||
os.makedirs(done_parent_dir, exist_ok=True)
|
||||
logger.debug(f"为done.done创建父目录: {done_parent_dir}")
|
||||
|
||||
with open(done_flag, "w", encoding="utf-8") as f:
|
||||
f.write("done\n")
|
||||
logger.info(f"表达方式JSON迁移已完成,共迁移 {migrated_count} 个表达方式,已写入done.done标记文件")
|
||||
except PermissionError as e:
|
||||
logger.error(f"权限不足,无法写入done.done标记文件: {e}")
|
||||
except OSError as e:
|
||||
logger.error(f"文件系统错误,无法写入done.done标记文件: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"写入done.done标记文件失败: {e}")
|
||||
|
||||
# 检查并处理grammar表达删除
|
||||
if not os.path.exists(done_flag2):
|
||||
logger.info("开始删除所有grammar类型的表达...")
|
||||
try:
|
||||
deleted_count = self.delete_all_grammar_expressions()
|
||||
logger.info(f"grammar表达删除完成,共删除 {deleted_count} 个表达")
|
||||
|
||||
# 创建done.done2标记文件
|
||||
with open(done_flag2, "w", encoding="utf-8") as f:
|
||||
f.write("done\n")
|
||||
logger.info("已创建done.done2标记文件,grammar表达删除标记完成")
|
||||
except Exception as e:
|
||||
logger.error(f"删除grammar表达或创建标记文件失败: {e}")
|
||||
else:
|
||||
logger.info("grammar表达已删除,跳过重复删除")
|
||||
|
||||
def _migrate_old_data_create_date(self):
|
||||
"""
|
||||
为没有create_date的老数据设置创建日期
|
||||
使用last_active_time作为create_date的默认值
|
||||
"""
|
||||
try:
|
||||
# 查找所有create_date为空的表达方式
|
||||
old_expressions = Expression.select().where(Expression.create_date.is_null())
|
||||
updated_count = 0
|
||||
|
||||
for expr in old_expressions:
|
||||
# 使用last_active_time作为create_date
|
||||
expr.create_date = expr.last_active_time
|
||||
expr.save()
|
||||
updated_count += 1
|
||||
|
||||
if updated_count > 0:
|
||||
logger.info(f"已为 {updated_count} 个老的表达方式设置创建日期")
|
||||
except Exception as e:
|
||||
logger.error(f"迁移老数据创建日期失败: {e}")
|
||||
|
||||
def delete_all_grammar_expressions(self) -> int:
|
||||
"""
|
||||
检查expression库中所有type为"grammar"的表达并全部删除
|
||||
|
||||
Returns:
|
||||
int: 删除的grammar表达数量
|
||||
"""
|
||||
try:
|
||||
# 查询所有type为"grammar"的表达
|
||||
grammar_expressions = Expression.select().where(Expression.type == "grammar")
|
||||
grammar_count = grammar_expressions.count()
|
||||
|
||||
if grammar_count == 0:
|
||||
logger.info("expression库中没有找到grammar类型的表达")
|
||||
return 0
|
||||
|
||||
logger.info(f"找到 {grammar_count} 个grammar类型的表达,开始删除...")
|
||||
|
||||
# 删除所有grammar类型的表达
|
||||
deleted_count = 0
|
||||
for expr in grammar_expressions:
|
||||
try:
|
||||
expr.delete_instance()
|
||||
deleted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"删除grammar表达失败: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"成功删除 {deleted_count} 个grammar类型的表达")
|
||||
return deleted_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"删除grammar表达过程中发生错误: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
expression_learner_manager = ExpressionLearnerManager()
|
||||
|
|
|
|||
|
|
@ -1,112 +0,0 @@
|
|||
import json
|
||||
import os
|
||||
import asyncio
|
||||
from src.common.logger import get_logger
|
||||
|
||||
logger = get_logger("migrate")
|
||||
|
||||
async def set_all_person_known():
|
||||
"""
|
||||
将person_info库中所有记录的is_known字段设置为True
|
||||
在设置之前,先清理掉user_id或platform为空的记录
|
||||
"""
|
||||
logger.info("开始设置所有person_info记录为已认识...")
|
||||
|
||||
try:
|
||||
from src.common.database.database_model import PersonInfo
|
||||
|
||||
# 获取所有PersonInfo记录
|
||||
all_persons = PersonInfo.select()
|
||||
total_count = all_persons.count()
|
||||
|
||||
logger.info(f"找到 {total_count} 个人员记录")
|
||||
|
||||
if total_count == 0:
|
||||
logger.info("没有找到任何人员记录")
|
||||
return {"total": 0, "deleted": 0, "updated": 0, "known_count": 0}
|
||||
|
||||
# 删除user_id或platform为空的记录
|
||||
deleted_count = 0
|
||||
invalid_records = PersonInfo.select().where(
|
||||
(PersonInfo.user_id.is_null())
|
||||
| (PersonInfo.user_id == "")
|
||||
| (PersonInfo.platform.is_null())
|
||||
| (PersonInfo.platform == "")
|
||||
)
|
||||
|
||||
# 记录要删除的记录信息
|
||||
for record in invalid_records:
|
||||
user_id_info = f"'{record.user_id}'" if record.user_id else "NULL"
|
||||
platform_info = f"'{record.platform}'" if record.platform else "NULL"
|
||||
person_name_info = f"'{record.person_name}'" if record.person_name else "无名称"
|
||||
logger.debug(
|
||||
f"删除无效记录: person_id={record.person_id}, user_id={user_id_info}, platform={platform_info}, person_name={person_name_info}"
|
||||
)
|
||||
|
||||
# 执行删除操作
|
||||
deleted_count = (
|
||||
PersonInfo.delete()
|
||||
.where(
|
||||
(PersonInfo.user_id.is_null())
|
||||
| (PersonInfo.user_id == "")
|
||||
| (PersonInfo.platform.is_null())
|
||||
| (PersonInfo.platform == "")
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
|
||||
if deleted_count > 0:
|
||||
logger.info(f"删除了 {deleted_count} 个user_id或platform为空的记录")
|
||||
else:
|
||||
logger.info("没有发现user_id或platform为空的记录")
|
||||
|
||||
# 重新获取剩余记录数量
|
||||
remaining_count = PersonInfo.select().count()
|
||||
logger.info(f"清理后剩余 {remaining_count} 个有效记录")
|
||||
|
||||
if remaining_count == 0:
|
||||
logger.info("清理后没有剩余记录")
|
||||
return {"total": total_count, "deleted": deleted_count, "updated": 0, "known_count": 0}
|
||||
|
||||
# 批量更新剩余记录的is_known字段为True
|
||||
updated_count = PersonInfo.update(is_known=True).execute()
|
||||
|
||||
logger.info(f"成功更新 {updated_count} 个人员记录的is_known字段为True")
|
||||
|
||||
# 验证更新结果
|
||||
known_count = PersonInfo.select().where(PersonInfo.is_known).count()
|
||||
|
||||
result = {"total": total_count, "deleted": deleted_count, "updated": updated_count, "known_count": known_count}
|
||||
|
||||
logger.info("=== person_info更新完成 ===")
|
||||
logger.info(f"原始记录数: {result['total']}")
|
||||
logger.info(f"删除记录数: {result['deleted']}")
|
||||
logger.info(f"更新记录数: {result['updated']}")
|
||||
logger.info(f"已认识记录数: {result['known_count']}")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"更新person_info过程中发生错误: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def check_and_run_migrations():
|
||||
# 获取根目录
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
data_dir = os.path.join(project_root, "data")
|
||||
temp_dir = os.path.join(data_dir, "temp")
|
||||
done_file = os.path.join(temp_dir, "done.mem")
|
||||
|
||||
# 检查done.mem是否存在
|
||||
if not os.path.exists(done_file):
|
||||
# 如果temp目录不存在则创建
|
||||
if not os.path.exists(temp_dir):
|
||||
os.makedirs(temp_dir, exist_ok=True)
|
||||
# 执行迁移函数
|
||||
# 依次执行两个异步函数
|
||||
await asyncio.sleep(3)
|
||||
await set_all_person_known()
|
||||
# 创建done.mem文件
|
||||
with open(done_file, "w", encoding="utf-8") as f:
|
||||
f.write("done")
|
||||
Loading…
Reference in New Issue