diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py index aaa8b0b5..ca00bfb6 100644 --- a/src/plugins/memory_system/Hippocampus.py +++ b/src/plugins/memory_system/Hippocampus.py @@ -1045,62 +1045,80 @@ class EntorhinalCortex: start_time = time.time() logger.info("[数据库] 开始重新同步所有记忆数据...") - with db.client.start_session() as session: - session.start_transaction() - try: - # 清空数据库 - clear_start = time.time() - # delete_many({})是危险操作需要在事务中执行 - db.graph_data.nodes.delete_many({}, session=session) - db.graph_data.edges.delete_many({}, session=session) - clear_end = time.time() - logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒") + # 清空数据库 + clear_start = time.time() - # 获取所有节点和边 - memory_nodes = list(self.memory_graph.G.nodes(data=True)) - memory_edges = list(self.memory_graph.G.edges(data=True)) + time1 = datetime.datetime.now() + time_suffix = time1.strftime('%Y%m%d%H%M%S') - # 重新写入节点 - node_start = time.time() - for concept, data in memory_nodes: - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] + # 备份节点和边集合 + db.graph_data.nodes.rename(f"nodes_tmp_{time_suffix}", dropTarget=True) + db.graph_data.edges.rename(f"edges_tmp_{time_suffix}", dropTarget=True) - node_data = { - "concept": concept, - "memory_items": memory_items, - "hash": self.hippocampus.calculate_node_hash(concept, memory_items), - "created_time": data.get("created_time", datetime.datetime.now().timestamp()), - "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), - } - db.graph_data.nodes.insert_one(node_data, session=session) - node_end = time.time() - logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒") + # 创建新的空集合 + db.create_collection("graph_data.nodes") + db.create_collection("graph_data.edges") - # 重新写入边 - edge_start = time.time() - for source, target, data in memory_edges: - edge_data = { - "source": source, - "target": target, - "strength": data.get("strength", 1), - "hash": self.hippocampus.calculate_edge_hash(source, target), - "created_time": data.get("created_time", datetime.datetime.now().timestamp()), - "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), - } - db.graph_data.edges.insert_one(edge_data, session=session) - edge_end = time.time() - logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒") + try: - end_time = time.time() - logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒") - logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") - except Exception as e: - logger.exception(f"[数据库] 重新同步失败: {e}") - session.abort_transaction() - else: - session.commit_transaction() + clear_end = time.time() + logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒") + + # 获取所有节点和边 + memory_nodes = list(self.memory_graph.G.nodes(data=True)) + memory_edges = list(self.memory_graph.G.edges(data=True)) + + # 重新写入节点 + node_start = time.time() + for concept, data in memory_nodes: + memory_items = data.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + node_data = { + "concept": concept, + "memory_items": memory_items, + "hash": self.hippocampus.calculate_node_hash(concept, memory_items), + "created_time": data.get("created_time", datetime.datetime.now().timestamp()), + "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), + } + db.graph_data.nodes.insert_one(node_data) + node_end = time.time() + logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒") + + # 重新写入边 + edge_start = time.time() + for source, target, data in memory_edges: + edge_data = { + "source": source, + "target": target, + "strength": data.get("strength", 1), + "hash": self.hippocampus.calculate_edge_hash(source, target), + "created_time": data.get("created_time", datetime.datetime.now().timestamp()), + "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), + } + db.graph_data.edges.insert_one(edge_data) + edge_end = time.time() + logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒") + + end_time = time.time() + logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒") + logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") + except Exception as e: + logger.exception(f"[数据库] 重新同步失败{e}") + # 同步失败,从临时集合恢复数据 + bak_start = time.time() + logger.info("正在恢复数据...") + + db[f'nodes_tmp_{time_suffix}'].rename(f"graph_data.nodes", dropTarget=True) + db[f'edges_tmp_{time_suffix}'].rename(f"graph_data.edges", dropTarget=True) + + bak_end = time.time() + logger.info(f"[数据库] 恢复数据耗时: {bak_end - bak_start:.2f}秒") + else: + # 同步成功,清空临时集合 + db.drop_collection(f"nodes_tmp_{time_suffix}") + db.drop_collection(f"edges_tmp_{time_suffix}")