ref:彻底移除海马体,优化log

pull/1294/head
SengokuCola 2025-10-02 01:41:38 +08:00
parent feb40cac70
commit 6a0a07582e
6 changed files with 8 additions and 714 deletions

View File

@ -1,86 +0,0 @@
# 海马体到记忆仓库转换任务
## 功能描述
这个功能实现了每60秒自动将海马体的节点转换为记忆仓库格式的功能。系统会随机选择5个海马体节点将它们的记忆内容拼接成一个完整的文本然后使用LLM生成标题并存储到记忆仓库中。
## 实现细节
### 核心组件
1. **HippocampusToMemoryChestTask** (`src/chat/memory_system/hippocampus_to_memory_chest_task.py`)
- 继承自 `AsyncTask` 基类
- 每60秒执行一次转换任务
- 启动后等待60秒再开始第一次执行
### 工作流程
1. **节点选择**从海马体的所有节点中随机选择5个节点
2. **内容拼接**:将选中节点的记忆内容按格式拼接:
```
【节点名称1】记忆内容1
【节点名称2】记忆内容2
...
```
3. **标题生成**使用Memory_chest的LLM模型为拼接的内容生成描述性标题
4. **数据存储**将标题和内容保存到MemoryChest数据库表中
5. **节点删除**:如果保存成功,删除已转换的海马体节点,防止重复构建
### 集成方式
任务已集成到主系统 (`src/main.py`) 中:
```python
# 初始化记忆系统
hippocampus_manager.initialize()
logger.info("记忆系统初始化成功")
# 添加海马体到记忆仓库的转换任务
await async_task_manager.add_task(HippocampusToMemoryChestTask())
logger.info("海马体到记忆仓库转换任务已启动")
```
## 配置参数
- **等待时间**60秒启动后等待时间
- **执行间隔**60秒每次执行间隔
- **节点数量**5个每次随机选择的节点数
## 日志输出
任务执行过程中会输出详细的日志信息:
- `[海马体转换] 开始执行海马体到记忆仓库的转换任务`
- `[海马体转换] 随机选择了 X 个节点: [节点列表]`
- `[海马体转换] 拼接完成,内容长度: X 字符`
- `[海马体转换] 已保存到记忆仓库,标题: [生成的标题]`
- `[海马体转换] 已删除节点: [节点名称]`
- `[海马体转换] 已删除 X 个节点并同步到数据库`
- `[海马体转换] 转换任务完成`
## 错误处理
- 如果海马体管理器未初始化,会跳过本次转换
- 如果节点数量少于5个会跳过本次转换
- 如果没有有效的记忆内容,会跳过本次转换
- 如果标题生成失败,会跳过保存操作
- 所有错误都会记录到日志中
## 测试
可以使用提供的测试脚本进行功能验证:
```bash
python test_hippocampus_task.py
```
## 注意事项
1. 确保海马体管理器已正确初始化
2. 确保Memory_chest的LLM模型可用
3. 确保数据库连接正常
4. 任务会在系统启动后60秒开始第一次执行
5. **重要**:转换后的海马体节点会被永久删除,确保不会重复构建
6. 删除操作会自动同步到数据库,保持数据一致性

View File

@ -13,7 +13,6 @@ from src.common.logger import get_logger
from src.common.server import get_global_server, Server
from src.mood.mood_manager import mood_manager
from src.chat.knowledge import lpmm_start_up
from src.memory_system.Hippocampus import hippocampus_manager
from src.memory_system.memory_management_task import MemoryManagementTask
from rich.traceback import install
from src.migrate_helper.migrate import check_and_run_migrations
@ -94,10 +93,6 @@ class MainSystem:
asyncio.create_task(get_chat_manager()._auto_save_task())
logger.info("聊天管理器初始化成功")
# 初始化记忆系统
hippocampus_manager.initialize()
logger.info("记忆系统初始化成功")
# 添加记忆管理任务
await async_task_manager.add_task(MemoryManagementTask())

View File

@ -1,611 +0,0 @@
# -*- coding: utf-8 -*-
import datetime
import math
import random
import time
import re
import jieba
import networkx as nx
import numpy as np
from typing import List, Tuple, Set
from collections import Counter
import traceback
from rich.traceback import install
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
from src.common.database.database_model import GraphNodes, GraphEdges # Peewee Models导入
from src.common.logger import get_logger
from src.chat.utils.utils import cut_key_words
# 添加cosine_similarity函数
def cosine_similarity(v1, v2):
"""计算余弦相似度"""
dot_product = np.dot(v1, v2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
return 0 if norm1 == 0 or norm2 == 0 else dot_product / (norm1 * norm2)
install(extra_lines=3)
def calculate_information_content(text):
"""计算文本的信息量(熵)"""
char_count = Counter(text)
total_chars = len(text)
if total_chars == 0:
return 0
entropy = 0
for count in char_count.values():
probability = count / total_chars
entropy -= probability * math.log2(probability)
return entropy
logger = get_logger("memory")
class MemoryGraph:
def __init__(self):
self.G = nx.Graph() # 使用 networkx 的图结构
def connect_dot(self, concept1, concept2):
# 避免自连接
if concept1 == concept2:
return
current_time = datetime.datetime.now().timestamp()
# 如果边已存在,增加 strength
if self.G.has_edge(concept1, concept2):
self.G[concept1][concept2]["strength"] = self.G[concept1][concept2].get("strength", 1) + 1
# 更新最后修改时间
self.G[concept1][concept2]["last_modified"] = current_time
else:
# 如果是新边,初始化 strength 为 1
self.G.add_edge(
concept1,
concept2,
strength=1,
created_time=current_time, # 添加创建时间
last_modified=current_time,
) # 添加最后修改时间
async def add_dot(self, concept, memory, hippocampus_instance=None):
current_time = datetime.datetime.now().timestamp()
if concept in self.G:
if "memory_items" in self.G.nodes[concept]:
# 获取现有的记忆项已经是str格式
existing_memory = self.G.nodes[concept]["memory_items"]
# 简单连接新旧记忆
new_memory_str = f"{existing_memory} | {memory}"
self.G.nodes[concept]["memory_items"] = new_memory_str
logger.info(f"节点 {concept} 记忆内容已简单拼接并更新:{new_memory_str}")
else:
self.G.nodes[concept]["memory_items"] = str(memory)
# 如果节点存在但没有memory_items,说明是第一次添加memory,设置created_time
if "created_time" not in self.G.nodes[concept]:
self.G.nodes[concept]["created_time"] = current_time
logger.info(f"节点 {concept} 创建新记忆:{str(memory)}")
# 更新最后修改时间
self.G.nodes[concept]["last_modified"] = current_time
else:
# 如果是新节点,创建新的记忆字符串
self.G.add_node(
concept,
memory_items=str(memory),
weight=1.0, # 新节点初始权重为1.0
created_time=current_time, # 添加创建时间
last_modified=current_time,
) # 添加最后修改时间
logger.info(f"新节点 {concept} 已添加,记忆内容已写入:{str(memory)}")
def get_dot(self, concept):
# 检查节点是否存在于图中
return (concept, self.G.nodes[concept]) if concept in self.G else None
def get_related_item(self, topic, depth=1):
if topic not in self.G:
return [], []
first_layer_items = []
second_layer_items = []
# 获取相邻节点
neighbors = list(self.G.neighbors(topic))
# 获取当前节点的记忆项
node_data = self.get_dot(topic)
if node_data:
_, data = node_data
if "memory_items" in data:
# 直接使用完整的记忆内容
if memory_items := data["memory_items"]:
first_layer_items.append(memory_items)
# 只在depth=2时获取第二层记忆
if depth >= 2:
# 获取相邻节点的记忆项
for neighbor in neighbors:
if node_data := self.get_dot(neighbor):
_, data = node_data
if "memory_items" in data:
# 直接使用完整的记忆内容
if memory_items := data["memory_items"]:
second_layer_items.append(memory_items)
return first_layer_items, second_layer_items
@property
def dots(self):
# 返回所有节点对应的 Memory_dot 对象
return [self.get_dot(node) for node in self.G.nodes()]
def forget_topic(self, topic):
"""随机删除指定话题中的一条记忆,如果话题没有记忆则移除该话题节点"""
if topic not in self.G:
return None
# 获取话题节点数据
node_data = self.G.nodes[topic]
# 删除整个节点
self.G.remove_node(topic)
# 如果节点存在memory_items
if "memory_items" in node_data:
if memory_items := node_data["memory_items"]:
return (
f"删除了节点 {topic} 的完整记忆: {memory_items[:50]}..."
if len(memory_items) > 50
else f"删除了节点 {topic} 的完整记忆: {memory_items}"
)
return None
# 海马体
class Hippocampus:
def __init__(self):
self.memory_graph = MemoryGraph()
self.entorhinal_cortex: EntorhinalCortex = None # type: ignore
self.parahippocampal_gyrus: ParahippocampalGyrus = None # type: ignore
def initialize(self):
# 初始化子组件
self.entorhinal_cortex = EntorhinalCortex(self)
self.parahippocampal_gyrus = ParahippocampalGyrus(self)
# 从数据库加载记忆图
self.entorhinal_cortex.sync_memory_from_db()
def get_all_node_names(self) -> list:
"""获取记忆图中所有节点的名字列表"""
return list(self.memory_graph.G.nodes())
@staticmethod
def calculate_node_hash(concept, memory_items) -> int:
"""计算节点的特征值"""
# memory_items已经是str格式直接按分隔符分割
if memory_items:
unique_items = {item.strip() for item in memory_items.split(" | ") if item.strip()}
else:
unique_items = set()
# 使用frozenset来保证顺序一致性
content = f"{concept}:{frozenset(unique_items)}"
return hash(content)
@staticmethod
def calculate_edge_hash(source, target) -> int:
"""计算边的特征值"""
# 直接使用元组,保证顺序一致性
return hash((source, target))
# 负责海马体与其他部分的交互
class EntorhinalCortex:
def __init__(self, hippocampus: Hippocampus):
self.hippocampus = hippocampus
self.memory_graph = hippocampus.memory_graph
async def sync_memory_to_db(self):
"""将记忆图同步到数据库"""
start_time = time.time()
current_time = datetime.datetime.now().timestamp()
# 获取数据库中所有节点和内存中所有节点
db_nodes = {node.concept: node for node in GraphNodes.select()}
memory_nodes = list(self.memory_graph.G.nodes(data=True))
# 批量准备节点数据
nodes_to_create = []
nodes_to_update = []
nodes_to_delete = set()
# 处理节点
for concept, data in memory_nodes:
if not concept or not isinstance(concept, str):
self.memory_graph.G.remove_node(concept)
continue
memory_items = data.get("memory_items", "")
# 直接检查字符串是否为空,不需要分割成列表
if not memory_items or memory_items.strip() == "":
self.memory_graph.G.remove_node(concept)
continue
# 计算内存中节点的特征值
memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items)
created_time = data.get("created_time", current_time)
last_modified = data.get("last_modified", current_time)
# memory_items直接作为字符串存储不需要JSON序列化
if not memory_items:
continue
# 获取权重属性
weight = data.get("weight", 1.0)
if concept not in db_nodes:
nodes_to_create.append(
{
"concept": concept,
"memory_items": memory_items,
"weight": weight,
"hash": memory_hash,
"created_time": created_time,
"last_modified": last_modified,
}
)
else:
db_node = db_nodes[concept]
if db_node.hash != memory_hash:
nodes_to_update.append(
{
"concept": concept,
"memory_items": memory_items,
"weight": weight,
"hash": memory_hash,
"last_modified": last_modified,
}
)
# 计算需要删除的节点
memory_concepts = {concept for concept, _ in memory_nodes}
nodes_to_delete = set(db_nodes.keys()) - memory_concepts
# 批量处理节点
if nodes_to_create:
batch_size = 100
for i in range(0, len(nodes_to_create), batch_size):
batch = nodes_to_create[i : i + batch_size]
GraphNodes.insert_many(batch).execute()
if nodes_to_update:
batch_size = 100
for i in range(0, len(nodes_to_update), batch_size):
batch = nodes_to_update[i : i + batch_size]
for node_data in batch:
GraphNodes.update(**{k: v for k, v in node_data.items() if k != "concept"}).where(
GraphNodes.concept == node_data["concept"]
).execute()
if nodes_to_delete:
GraphNodes.delete().where(GraphNodes.concept.in_(nodes_to_delete)).execute() # type: ignore
# 处理边的信息
db_edges = list(GraphEdges.select())
memory_edges = list(self.memory_graph.G.edges(data=True))
# 创建边的哈希值字典
db_edge_dict = {}
for edge in db_edges:
edge_hash = self.hippocampus.calculate_edge_hash(edge.source, edge.target)
db_edge_dict[(edge.source, edge.target)] = {"hash": edge_hash, "strength": edge.strength}
# 批量准备边数据
edges_to_create = []
edges_to_update = []
# 处理边
for source, target, data in memory_edges:
edge_hash = self.hippocampus.calculate_edge_hash(source, target)
edge_key = (source, target)
strength = data.get("strength", 1)
created_time = data.get("created_time", current_time)
last_modified = data.get("last_modified", current_time)
if edge_key not in db_edge_dict:
edges_to_create.append(
{
"source": source,
"target": target,
"strength": strength,
"hash": edge_hash,
"created_time": created_time,
"last_modified": last_modified,
}
)
elif db_edge_dict[edge_key]["hash"] != edge_hash:
edges_to_update.append(
{
"source": source,
"target": target,
"strength": strength,
"hash": edge_hash,
"last_modified": last_modified,
}
)
# 计算需要删除的边
memory_edge_keys = {(source, target) for source, target, _ in memory_edges}
edges_to_delete = set(db_edge_dict.keys()) - memory_edge_keys
# 批量处理边
if edges_to_create:
batch_size = 100
for i in range(0, len(edges_to_create), batch_size):
batch = edges_to_create[i : i + batch_size]
GraphEdges.insert_many(batch).execute()
if edges_to_update:
batch_size = 100
for i in range(0, len(edges_to_update), batch_size):
batch = edges_to_update[i : i + batch_size]
for edge_data in batch:
GraphEdges.update(**{k: v for k, v in edge_data.items() if k not in ["source", "target"]}).where(
(GraphEdges.source == edge_data["source"]) & (GraphEdges.target == edge_data["target"])
).execute()
if edges_to_delete:
for source, target in edges_to_delete:
GraphEdges.delete().where((GraphEdges.source == source) & (GraphEdges.target == target)).execute()
end_time = time.time()
logger.info(f"[数据库] 同步完成,总耗时: {end_time - start_time:.2f}")
logger.info(
f"[数据库] 同步了 {len(nodes_to_create) + len(nodes_to_update)} 个节点和 {len(edges_to_create) + len(edges_to_update)} 条边"
)
async def resync_memory_to_db(self):
"""清空数据库并重新同步所有记忆数据"""
start_time = time.time()
logger.info("[数据库] 开始重新同步所有记忆数据...")
# 清空数据库
clear_start = time.time()
GraphNodes.delete().execute()
GraphEdges.delete().execute()
clear_end = time.time()
logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}")
# 获取所有节点和边
memory_nodes = list(self.memory_graph.G.nodes(data=True))
memory_edges = list(self.memory_graph.G.edges(data=True))
current_time = datetime.datetime.now().timestamp()
# 批量准备节点数据
nodes_data = []
for concept, data in memory_nodes:
memory_items = data.get("memory_items", "")
# 直接检查字符串是否为空,不需要分割成列表
if not memory_items or memory_items.strip() == "":
self.memory_graph.G.remove_node(concept)
continue
# 计算内存中节点的特征值
memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items)
created_time = data.get("created_time", current_time)
last_modified = data.get("last_modified", current_time)
# memory_items直接作为字符串存储不需要JSON序列化
if not memory_items:
continue
# 获取权重属性
weight = data.get("weight", 1.0)
nodes_data.append(
{
"concept": concept,
"memory_items": memory_items,
"weight": weight,
"hash": memory_hash,
"created_time": created_time,
"last_modified": last_modified,
}
)
# 批量插入节点
if nodes_data:
batch_size = 100
for i in range(0, len(nodes_data), batch_size):
batch = nodes_data[i : i + batch_size]
GraphNodes.insert_many(batch).execute()
# 批量准备边数据
edges_data = []
for source, target, data in memory_edges:
try:
edges_data.append(
{
"source": source,
"target": target,
"strength": data.get("strength", 1),
"hash": self.hippocampus.calculate_edge_hash(source, target),
"created_time": data.get("created_time", current_time),
"last_modified": data.get("last_modified", current_time),
}
)
except Exception as e:
logger.error(f"准备边 {source}-{target} 数据时发生错误: {e}")
continue
# 批量插入边
if edges_data:
batch_size = 100
for i in range(0, len(edges_data), batch_size):
batch = edges_data[i : i + batch_size]
GraphEdges.insert_many(batch).execute()
end_time = time.time()
logger.info(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}")
logger.info(f"[数据库] 同步了 {len(nodes_data)} 个节点和 {len(edges_data)} 条边")
def sync_memory_from_db(self):
"""从数据库同步数据到内存中的图结构"""
current_time = datetime.datetime.now().timestamp()
need_update = False
# 清空当前图
self.memory_graph.G.clear()
# 统计加载情况
total_nodes = 0
loaded_nodes = 0
skipped_nodes = 0
# 从数据库加载所有节点
nodes = list(GraphNodes.select())
total_nodes = len(nodes)
for node in nodes:
concept = node.concept
try:
# 处理空字符串或None的情况
if not node.memory_items or node.memory_items.strip() == "":
logger.warning(f"节点 {concept} 的memory_items为空跳过")
skipped_nodes += 1
continue
# 直接使用memory_items
memory_items = node.memory_items.strip()
# 检查时间字段是否存在
if not node.created_time or not node.last_modified:
# 更新数据库中的节点
update_data = {}
if not node.created_time:
update_data["created_time"] = current_time
if not node.last_modified:
update_data["last_modified"] = current_time
if update_data:
GraphNodes.update(**update_data).where(GraphNodes.concept == concept).execute()
# 获取时间信息(如果不存在则使用当前时间)
created_time = node.created_time or current_time
last_modified = node.last_modified or current_time
# 获取权重属性
weight = node.weight if hasattr(node, "weight") and node.weight is not None else 1.0
# 添加节点到图中
self.memory_graph.G.add_node(
concept,
memory_items=memory_items,
weight=weight,
created_time=created_time,
last_modified=last_modified,
)
loaded_nodes += 1
except Exception as e:
logger.error(f"加载节点 {concept} 时发生错误: {e}")
skipped_nodes += 1
continue
# 从数据库加载所有边
edges = list(GraphEdges.select())
for edge in edges:
source = edge.source
target = edge.target
strength = edge.strength
# 检查时间字段是否存在
if not edge.created_time or not edge.last_modified:
need_update = True
# 更新数据库中的边
update_data = {}
if not edge.created_time:
update_data["created_time"] = current_time
if not edge.last_modified:
update_data["last_modified"] = current_time
GraphEdges.update(**update_data).where(
(GraphEdges.source == source) & (GraphEdges.target == target)
).execute()
# 获取时间信息(如果不存在则使用当前时间)
created_time = edge.created_time or current_time
last_modified = edge.last_modified or current_time
# 只有当源节点和目标节点都存在时才添加边
if source in self.memory_graph.G and target in self.memory_graph.G:
self.memory_graph.G.add_edge(
source, target, strength=strength, created_time=created_time, last_modified=last_modified
)
if need_update:
logger.info("[数据库] 已为缺失的时间字段进行补充")
# 输出加载统计信息
logger.info(
f"[数据库] 记忆加载完成: 总计 {total_nodes} 个节点, 成功加载 {loaded_nodes} 个, 跳过 {skipped_nodes}"
)
# 负责记忆管理
class ParahippocampalGyrus:
def __init__(self, hippocampus: Hippocampus):
self.hippocampus = hippocampus
self.memory_graph = hippocampus.memory_graph
class HippocampusManager:
def __init__(self):
self._hippocampus: Hippocampus = None # type: ignore
self._initialized = False
def initialize(self):
"""初始化海马体实例"""
if self._initialized:
return self._hippocampus
self._hippocampus = Hippocampus()
self._hippocampus.initialize()
self._initialized = True
# 输出记忆图统计信息
memory_graph = self._hippocampus.memory_graph.G
node_count = len(memory_graph.nodes())
edge_count = len(memory_graph.edges())
logger.info(f"""
--------------------------------
记忆系统参数配置:
记忆图统计信息: 节点数量: {node_count}, 连接数量: {edge_count}
--------------------------------""") # noqa: E501
return self._hippocampus
def get_hippocampus(self):
if not self._initialized:
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
return self._hippocampus
def get_all_node_names(self) -> list:
"""获取所有节点名称的公共接口"""
if not self._initialized:
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
return self._hippocampus.get_all_node_names()
# 创建全局实例
hippocampus_manager = HippocampusManager()

View File

@ -17,7 +17,7 @@ from .memory_utils import (
check_title_exists_fuzzy
)
logger = get_logger("memory_chest")
logger = get_logger("memory")
class MemoryChest:
def __init__(self):

View File

@ -9,7 +9,7 @@ from src.common.logger import get_logger
from src.common.database.database_model import MemoryChest as MemoryChestModel
from src.config.config import global_config
logger = get_logger("memory_management")
logger = get_logger("memory")
class MemoryManagementTask(AsyncTask):
@ -86,12 +86,11 @@ class MemoryManagementTask(AsyncTask):
async def run(self):
"""执行记忆管理任务"""
try:
logger.info("[记忆管理] 开始执行记忆管理任务")
# 获取当前记忆数量
current_count = self._get_memory_count()
percentage = current_count / self.max_memory_number
logger.info(f"[记忆管理] 当前记忆数量: {current_count}/{self.max_memory_number} ({percentage:.1%})")
logger.info(f"当前记忆数量: {current_count}/{self.max_memory_number} ({percentage:.1%})")
# 如果记忆数量为0跳过执行
if current_count < 10:
@ -100,7 +99,7 @@ class MemoryManagementTask(AsyncTask):
# 随机选择一个记忆标题
selected_title = self._get_random_memory_title()
if not selected_title:
logger.warning("[记忆管理] 无法获取随机记忆标题,跳过执行")
logger.warning("无法获取随机记忆标题,跳过执行")
return
# 执行choose_merge_target获取相关记忆标题与内容
@ -117,15 +116,13 @@ class MemoryManagementTask(AsyncTask):
logger.warning("[记忆管理] 记忆合并失败,跳过删除")
return
logger.info(f"[记忆管理] 记忆合并成功,新标题: {merged_title}")
logger.info(f"记忆合并成功,新标题: {merged_title}")
# 删除原始记忆(包括选中的标题和相关的记忆标题)
titles_to_delete = [selected_title] + related_titles
deleted_count = self._delete_original_memories(titles_to_delete)
logger.info(f"[记忆管理] 已删除 {deleted_count} 条原始记忆")
logger.info("[记忆管理] 记忆管理任务完成")
logger.info(f"已删除 {deleted_count} 条原始记忆")
except Exception as e:
logger.error(f"[记忆管理] 执行记忆管理任务时发生错误: {e}", exc_info=True)

View File

@ -5,7 +5,6 @@ from src.config.config import global_config
from src.chat.utils.prompt_builder import Prompt
from src.llm_models.payload_content.tool_option import ToolParamType
from src.plugin_system import BaseAction, ActionActivationType
from src.memory_system.Hippocampus import hippocampus_manager
from src.chat.utils.utils import cut_key_words
from src.memory_system.Memory_chest import global_memory_chest
from src.plugin_system.base.base_tool import BaseTool