修复BUG,记忆可能丢失的问题

pull/941/head
xcr_1 2025-05-10 11:01:58 +08:00
parent 95c0f5f4f0
commit 7a9e6a533c
1 changed files with 69 additions and 51 deletions

View File

@ -1045,62 +1045,80 @@ class EntorhinalCortex:
start_time = time.time()
logger.info("[数据库] 开始重新同步所有记忆数据...")
with db.client.start_session() as session:
session.start_transaction()
try:
# 清空数据库
clear_start = time.time()
# delete_many({})是危险操作需要在事务中执行
db.graph_data.nodes.delete_many({}, session=session)
db.graph_data.edges.delete_many({}, session=session)
clear_end = time.time()
logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}")
# 清空数据库
clear_start = time.time()
# 获取所有节点和边
memory_nodes = list(self.memory_graph.G.nodes(data=True))
memory_edges = list(self.memory_graph.G.edges(data=True))
time1 = datetime.datetime.now()
time_suffix = time1.strftime('%Y%m%d%H%M%S')
# 重新写入节点
node_start = time.time()
for concept, data in memory_nodes:
memory_items = data.get("memory_items", [])
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
# 备份节点和边集合
db.graph_data.nodes.rename(f"nodes_tmp_{time_suffix}", dropTarget=True)
db.graph_data.edges.rename(f"edges_tmp_{time_suffix}", dropTarget=True)
node_data = {
"concept": concept,
"memory_items": memory_items,
"hash": self.hippocampus.calculate_node_hash(concept, memory_items),
"created_time": data.get("created_time", datetime.datetime.now().timestamp()),
"last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
}
db.graph_data.nodes.insert_one(node_data, session=session)
node_end = time.time()
logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}")
# 创建新的空集合
db.create_collection("graph_data.nodes")
db.create_collection("graph_data.edges")
# 重新写入边
edge_start = time.time()
for source, target, data in memory_edges:
edge_data = {
"source": source,
"target": target,
"strength": data.get("strength", 1),
"hash": self.hippocampus.calculate_edge_hash(source, target),
"created_time": data.get("created_time", datetime.datetime.now().timestamp()),
"last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
}
db.graph_data.edges.insert_one(edge_data, session=session)
edge_end = time.time()
logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}")
try:
end_time = time.time()
logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}")
logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边")
except Exception as e:
logger.exception(f"[数据库] 重新同步失败: {e}")
session.abort_transaction()
else:
session.commit_transaction()
clear_end = time.time()
logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}")
# 获取所有节点和边
memory_nodes = list(self.memory_graph.G.nodes(data=True))
memory_edges = list(self.memory_graph.G.edges(data=True))
# 重新写入节点
node_start = time.time()
for concept, data in memory_nodes:
memory_items = data.get("memory_items", [])
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
node_data = {
"concept": concept,
"memory_items": memory_items,
"hash": self.hippocampus.calculate_node_hash(concept, memory_items),
"created_time": data.get("created_time", datetime.datetime.now().timestamp()),
"last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
}
db.graph_data.nodes.insert_one(node_data)
node_end = time.time()
logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}")
# 重新写入边
edge_start = time.time()
for source, target, data in memory_edges:
edge_data = {
"source": source,
"target": target,
"strength": data.get("strength", 1),
"hash": self.hippocampus.calculate_edge_hash(source, target),
"created_time": data.get("created_time", datetime.datetime.now().timestamp()),
"last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
}
db.graph_data.edges.insert_one(edge_data)
edge_end = time.time()
logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}")
end_time = time.time()
logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}")
logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边")
except Exception as e:
logger.exception(f"[数据库] 重新同步失败{e}")
# 同步失败,从临时集合恢复数据
bak_start = time.time()
logger.info("正在恢复数据...")
db[f'nodes_tmp_{time_suffix}'].rename(f"graph_data.nodes", dropTarget=True)
db[f'edges_tmp_{time_suffix}'].rename(f"graph_data.edges", dropTarget=True)
bak_end = time.time()
logger.info(f"[数据库] 恢复数据耗时: {bak_end - bak_start:.2f}")
else:
# 同步成功,清空临时集合
db.drop_collection(f"nodes_tmp_{time_suffix}")
db.drop_collection(f"edges_tmp_{time_suffix}")