From 40fb50249f79e803f657455620283e66602ec637 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 8 Oct 2025 18:50:18 +0800 Subject: [PATCH] =?UTF-8?q?remove:=E7=A7=BB=E9=99=A4=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/express/expression_learner.py | 186 ------------------------- src/migrate_helper/__init__.py | 0 src/migrate_helper/migrate.py | 112 --------------- 3 files changed, 298 deletions(-) delete mode 100644 src/migrate_helper/__init__.py delete mode 100644 src/migrate_helper/migrate.py diff --git a/src/chat/express/expression_learner.py b/src/chat/express/expression_learner.py index 3d736cb6..4b534fbd 100644 --- a/src/chat/express/expression_learner.py +++ b/src/chat/express/expression_learner.py @@ -539,8 +539,6 @@ class ExpressionLearnerManager: self.expression_learners = {} self._ensure_expression_directories() - self._auto_migrate_json_to_db() - self._migrate_old_data_create_date() def get_expression_learner(self, chat_id: str) -> ExpressionLearner: if chat_id not in self.expression_learners: @@ -565,189 +563,5 @@ class ExpressionLearnerManager: except Exception as e: logger.error(f"创建目录失败 {directory}: {e}") - def _auto_migrate_json_to_db(self): - """ - 自动将/data/expression/learnt_style 和 learnt_grammar 下所有expressions.json迁移到数据库。 - 迁移完成后在/data/expression/done.done写入标记文件,存在则跳过。 - 然后检查done.done2,如果没有就删除所有grammar表达并创建该标记文件。 - """ - base_dir = os.path.join("data", "expression") - done_flag = os.path.join(base_dir, "done.done") - done_flag2 = os.path.join(base_dir, "done.done2") - - # 确保基础目录存在 - try: - os.makedirs(base_dir, exist_ok=True) - logger.debug(f"确保目录存在: {base_dir}") - except Exception as e: - logger.error(f"创建表达方式目录失败: {e}") - return - - if os.path.exists(done_flag): - logger.info("表达方式JSON已迁移,无需重复迁移。") - else: - logger.info("开始迁移表达方式JSON到数据库...") - migrated_count = 0 - - for type in ["learnt_style", "learnt_grammar"]: - type_str = "style" if type == "learnt_style" else "grammar" - type_dir = os.path.join(base_dir, type) - if not os.path.exists(type_dir): - logger.debug(f"目录不存在,跳过: {type_dir}") - continue - - try: - chat_ids = os.listdir(type_dir) - logger.debug(f"在 {type_dir} 中找到 {len(chat_ids)} 个聊天ID目录") - except Exception as e: - logger.error(f"读取目录失败 {type_dir}: {e}") - continue - - for chat_id in chat_ids: - expr_file = os.path.join(type_dir, chat_id, "expressions.json") - if not os.path.exists(expr_file): - continue - try: - with open(expr_file, "r", encoding="utf-8") as f: - expressions = json.load(f) - - if not isinstance(expressions, list): - logger.warning(f"表达方式文件格式错误,跳过: {expr_file}") - continue - - for expr in expressions: - if not isinstance(expr, dict): - continue - - situation = expr.get("situation") - style_val = expr.get("style") - count = expr.get("count", 1) - last_active_time = expr.get("last_active_time", time.time()) - - if not situation or not style_val: - logger.warning(f"表达方式缺少必要字段,跳过: {expr}") - continue - - # 查重:同chat_id+type+situation+style - from src.common.database.database_model import Expression - - query = Expression.select().where( - (Expression.chat_id == chat_id) - & (Expression.type == type_str) - & (Expression.situation == situation) - & (Expression.style == style_val) - ) - if query.exists(): - expr_obj = query.get() - expr_obj.count = max(expr_obj.count, count) - expr_obj.last_active_time = max(expr_obj.last_active_time, last_active_time) - expr_obj.save() - else: - Expression.create( - situation=situation, - style=style_val, - count=count, - last_active_time=last_active_time, - chat_id=chat_id, - type=type_str, - create_date=last_active_time, # 迁移时使用last_active_time作为创建时间 - ) - migrated_count += 1 - logger.info(f"已迁移 {expr_file} 到数据库,包含 {len(expressions)} 个表达方式") - except json.JSONDecodeError as e: - logger.error(f"JSON解析失败 {expr_file}: {e}") - except Exception as e: - logger.error(f"迁移表达方式 {expr_file} 失败: {e}") - - # 标记迁移完成 - try: - # 确保done.done文件的父目录存在 - done_parent_dir = os.path.dirname(done_flag) - if not os.path.exists(done_parent_dir): - os.makedirs(done_parent_dir, exist_ok=True) - logger.debug(f"为done.done创建父目录: {done_parent_dir}") - - with open(done_flag, "w", encoding="utf-8") as f: - f.write("done\n") - logger.info(f"表达方式JSON迁移已完成,共迁移 {migrated_count} 个表达方式,已写入done.done标记文件") - except PermissionError as e: - logger.error(f"权限不足,无法写入done.done标记文件: {e}") - except OSError as e: - logger.error(f"文件系统错误,无法写入done.done标记文件: {e}") - except Exception as e: - logger.error(f"写入done.done标记文件失败: {e}") - - # 检查并处理grammar表达删除 - if not os.path.exists(done_flag2): - logger.info("开始删除所有grammar类型的表达...") - try: - deleted_count = self.delete_all_grammar_expressions() - logger.info(f"grammar表达删除完成,共删除 {deleted_count} 个表达") - - # 创建done.done2标记文件 - with open(done_flag2, "w", encoding="utf-8") as f: - f.write("done\n") - logger.info("已创建done.done2标记文件,grammar表达删除标记完成") - except Exception as e: - logger.error(f"删除grammar表达或创建标记文件失败: {e}") - else: - logger.info("grammar表达已删除,跳过重复删除") - - def _migrate_old_data_create_date(self): - """ - 为没有create_date的老数据设置创建日期 - 使用last_active_time作为create_date的默认值 - """ - try: - # 查找所有create_date为空的表达方式 - old_expressions = Expression.select().where(Expression.create_date.is_null()) - updated_count = 0 - - for expr in old_expressions: - # 使用last_active_time作为create_date - expr.create_date = expr.last_active_time - expr.save() - updated_count += 1 - - if updated_count > 0: - logger.info(f"已为 {updated_count} 个老的表达方式设置创建日期") - except Exception as e: - logger.error(f"迁移老数据创建日期失败: {e}") - - def delete_all_grammar_expressions(self) -> int: - """ - 检查expression库中所有type为"grammar"的表达并全部删除 - - Returns: - int: 删除的grammar表达数量 - """ - try: - # 查询所有type为"grammar"的表达 - grammar_expressions = Expression.select().where(Expression.type == "grammar") - grammar_count = grammar_expressions.count() - - if grammar_count == 0: - logger.info("expression库中没有找到grammar类型的表达") - return 0 - - logger.info(f"找到 {grammar_count} 个grammar类型的表达,开始删除...") - - # 删除所有grammar类型的表达 - deleted_count = 0 - for expr in grammar_expressions: - try: - expr.delete_instance() - deleted_count += 1 - except Exception as e: - logger.error(f"删除grammar表达失败: {e}") - continue - - logger.info(f"成功删除 {deleted_count} 个grammar类型的表达") - return deleted_count - - except Exception as e: - logger.error(f"删除grammar表达过程中发生错误: {e}") - return 0 - expression_learner_manager = ExpressionLearnerManager() diff --git a/src/migrate_helper/__init__.py b/src/migrate_helper/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/migrate_helper/migrate.py b/src/migrate_helper/migrate.py deleted file mode 100644 index 487b1f3f..00000000 --- a/src/migrate_helper/migrate.py +++ /dev/null @@ -1,112 +0,0 @@ -import json -import os -import asyncio -from src.common.logger import get_logger - -logger = get_logger("migrate") - -async def set_all_person_known(): - """ - 将person_info库中所有记录的is_known字段设置为True - 在设置之前,先清理掉user_id或platform为空的记录 - """ - logger.info("开始设置所有person_info记录为已认识...") - - try: - from src.common.database.database_model import PersonInfo - - # 获取所有PersonInfo记录 - all_persons = PersonInfo.select() - total_count = all_persons.count() - - logger.info(f"找到 {total_count} 个人员记录") - - if total_count == 0: - logger.info("没有找到任何人员记录") - return {"total": 0, "deleted": 0, "updated": 0, "known_count": 0} - - # 删除user_id或platform为空的记录 - deleted_count = 0 - invalid_records = PersonInfo.select().where( - (PersonInfo.user_id.is_null()) - | (PersonInfo.user_id == "") - | (PersonInfo.platform.is_null()) - | (PersonInfo.platform == "") - ) - - # 记录要删除的记录信息 - for record in invalid_records: - user_id_info = f"'{record.user_id}'" if record.user_id else "NULL" - platform_info = f"'{record.platform}'" if record.platform else "NULL" - person_name_info = f"'{record.person_name}'" if record.person_name else "无名称" - logger.debug( - f"删除无效记录: person_id={record.person_id}, user_id={user_id_info}, platform={platform_info}, person_name={person_name_info}" - ) - - # 执行删除操作 - deleted_count = ( - PersonInfo.delete() - .where( - (PersonInfo.user_id.is_null()) - | (PersonInfo.user_id == "") - | (PersonInfo.platform.is_null()) - | (PersonInfo.platform == "") - ) - .execute() - ) - - if deleted_count > 0: - logger.info(f"删除了 {deleted_count} 个user_id或platform为空的记录") - else: - logger.info("没有发现user_id或platform为空的记录") - - # 重新获取剩余记录数量 - remaining_count = PersonInfo.select().count() - logger.info(f"清理后剩余 {remaining_count} 个有效记录") - - if remaining_count == 0: - logger.info("清理后没有剩余记录") - return {"total": total_count, "deleted": deleted_count, "updated": 0, "known_count": 0} - - # 批量更新剩余记录的is_known字段为True - updated_count = PersonInfo.update(is_known=True).execute() - - logger.info(f"成功更新 {updated_count} 个人员记录的is_known字段为True") - - # 验证更新结果 - known_count = PersonInfo.select().where(PersonInfo.is_known).count() - - result = {"total": total_count, "deleted": deleted_count, "updated": updated_count, "known_count": known_count} - - logger.info("=== person_info更新完成 ===") - logger.info(f"原始记录数: {result['total']}") - logger.info(f"删除记录数: {result['deleted']}") - logger.info(f"更新记录数: {result['updated']}") - logger.info(f"已认识记录数: {result['known_count']}") - - return result - - except Exception as e: - logger.error(f"更新person_info过程中发生错误: {e}") - raise - - -async def check_and_run_migrations(): - # 获取根目录 - project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) - data_dir = os.path.join(project_root, "data") - temp_dir = os.path.join(data_dir, "temp") - done_file = os.path.join(temp_dir, "done.mem") - - # 检查done.mem是否存在 - if not os.path.exists(done_file): - # 如果temp目录不存在则创建 - if not os.path.exists(temp_dir): - os.makedirs(temp_dir, exist_ok=True) - # 执行迁移函数 - # 依次执行两个异步函数 - await asyncio.sleep(3) - await set_all_person_known() - # 创建done.mem文件 - with open(done_file, "w", encoding="utf-8") as f: - f.write("done")