Compare commits

...

6 Commits

Author SHA1 Message Date
SengokuCola f052340d21 Merge branch 'dev' of https://github.com/Mai-with-u/MaiBot into dev 2026-01-13 00:47:58 +08:00
SengokuCola 199a8a7dff feat:添加lpmm内部接口,信息抽取类和一个测试脚本 2026-01-13 00:47:55 +08:00
SengokuCola 523a7517a9 feat:planner加入黑话缓存 2026-01-13 00:47:33 +08:00
SengokuCola 465fb9d865 remove:移除记忆的 关键点 项目 2026-01-13 00:47:22 +08:00
墨梓柒 056b4df2dd
WebUI b2a259fbc4c8477f2ab01eb5e7a75969cda53a1e 2026-01-13 00:43:55 +08:00
墨梓柒 37589ebdfb
feat: 添加段落内容加载功能及相关配置 2026-01-13 00:42:49 +08:00
18 changed files with 973 additions and 98 deletions

View File

@ -0,0 +1,265 @@
import asyncio
import os
import sys
# 尽量统一控制台编码为 utf-8避免中文输出报错
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保项目根目录在 sys.path 中,以便导入 src.*
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
try:
# 显式从 src.chat.knowledge.lpmm_ops 导入单例对象
from src.chat.knowledge.lpmm_ops import lpmm_ops
from src.common.logger import get_logger
from src.memory_system.retrieval_tools.query_lpmm_knowledge import query_lpmm_knowledge
from src.chat.knowledge import lpmm_start_up
from src.config.config import global_config
except ImportError as e:
print(f"导入失败,请确保在项目根目录下运行脚本: {e}")
sys.exit(1)
logger = get_logger("lpmm_interactive_manager")
async def interactive_add():
"""交互式导入知识"""
print("\n" + "=" * 40)
print(" --- 📥 导入知识 (Add) ---")
print("=" * 40)
print("说明:请输入要导入的文本内容。")
print(" - 支持多段落,段落间请保留空行。")
print(" - 输入完成后,在新起的一行输入 'EOF' 并回车结束输入。")
print("-" * 40)
lines = []
while True:
try:
line = input()
if line.strip().upper() == "EOF":
break
lines.append(line)
except EOFError:
break
text = "\n".join(lines).strip()
if not text:
print("\n[!] 内容为空,操作已取消。")
return
print("\n[进度] 正在调用 LPMM 接口进行信息抽取与向量化,请稍候...")
try:
# 使用 lpmm_ops.py 中的接口
result = await lpmm_ops.add_content(text)
if result["status"] == "success":
print(f"\n[√] 成功:{result['message']}")
print(f" 实际新增段落数: {result.get('count', 0)}")
else:
print(f"\n[×] 失败:{result['message']}")
except Exception as e:
print(f"\n[×] 发生异常: {e}")
logger.error(f"add_content 异常: {e}", exc_info=True)
async def interactive_delete():
"""交互式删除知识"""
print("\n" + "=" * 40)
print(" --- 🗑️ 删除知识 (Delete) ---")
print("=" * 40)
print("删除模式:")
print(" 1. 关键词模糊匹配(删除包含关键词的所有段落)")
print(" 2. 完整文段匹配(删除完全匹配的段落)")
print("-" * 40)
mode = input("请选择删除模式 (1/2): ").strip()
exact_match = False
if mode == "2":
exact_match = True
print("\n[完整文段匹配模式]")
print("说明:请输入要删除的完整文段内容(必须完全一致)。")
print(" - 支持多行输入,输入完成后在新起的一行输入 'EOF' 并回车。")
print("-" * 40)
lines = []
while True:
try:
line = input()
if line.strip().upper() == "EOF":
break
lines.append(line)
except EOFError:
break
keyword = "\n".join(lines).strip()
else:
if mode != "1":
print("\n[!] 无效选择,默认使用关键词模糊匹配模式。")
print("\n[关键词模糊匹配模式]")
keyword = input("请输入匹配关键词: ").strip()
if not keyword:
print("\n[!] 输入为空,操作已取消。")
return
print("-" * 40)
confirm = input(f"危险确认:确定要删除所有匹配 '{keyword[:50]}{'...' if len(keyword) > 50 else ''}' 的知识吗?(y/N): ").strip().lower()
if confirm != 'y':
print("\n[!] 已取消删除操作。")
return
print("\n[进度] 正在执行删除并更新索引...")
try:
# 使用 lpmm_ops.py 中的接口
result = await lpmm_ops.delete(keyword, exact_match=exact_match)
if result["status"] == "success":
print(f"\n[√] 成功:{result['message']}")
print(f" 删除条数: {result.get('deleted_count', 0)}")
elif result["status"] == "info":
print(f"\n[i] 提示:{result['message']}")
else:
print(f"\n[×] 失败:{result['message']}")
except Exception as e:
print(f"\n[×] 发生异常: {e}")
logger.error(f"delete 异常: {e}", exc_info=True)
async def interactive_clear():
"""交互式清空知识库"""
print("\n" + "=" * 40)
print(" --- ⚠️ 清空知识库 (Clear All) ---")
print("=" * 40)
print("警告此操作将删除LPMM知识库中的所有内容")
print(" - 所有段落向量")
print(" - 所有实体向量")
print(" - 所有关系向量")
print(" - 整个知识图谱")
print(" - 此操作不可恢复!")
print("-" * 40)
# 双重确认
confirm1 = input("⚠️ 第一次确认:确定要清空整个知识库吗?(输入 'YES' 继续): ").strip()
if confirm1 != "YES":
print("\n[!] 已取消清空操作。")
return
print("\n" + "=" * 40)
confirm2 = input("⚠️ 第二次确认:此操作不可恢复,请再次输入 'CLEAR' 确认: ").strip()
if confirm2 != "CLEAR":
print("\n[!] 已取消清空操作。")
return
print("\n[进度] 正在清空知识库...")
try:
# 使用 lpmm_ops.py 中的接口
result = await lpmm_ops.clear_all()
if result["status"] == "success":
print(f"\n[√] 成功:{result['message']}")
stats = result.get("stats", {})
before = stats.get("before", {})
after = stats.get("after", {})
print("\n[统计信息]")
print(f" 清空前: 段落={before.get('paragraphs', 0)}, 实体={before.get('entities', 0)}, "
f"关系={before.get('relations', 0)}, KG节点={before.get('kg_nodes', 0)}, KG边={before.get('kg_edges', 0)}")
print(f" 清空后: 段落={after.get('paragraphs', 0)}, 实体={after.get('entities', 0)}, "
f"关系={after.get('relations', 0)}, KG节点={after.get('kg_nodes', 0)}, KG边={after.get('kg_edges', 0)}")
else:
print(f"\n[×] 失败:{result['message']}")
except Exception as e:
print(f"\n[×] 发生异常: {e}")
logger.error(f"clear_all 异常: {e}", exc_info=True)
async def interactive_search():
"""交互式查询知识"""
print("\n" + "=" * 40)
print(" --- 🔍 查询知识 (Search) ---")
print("=" * 40)
print("说明:输入查询问题或关键词,系统会返回相关的知识段落。")
print("-" * 40)
# 确保 LPMM 已初始化
if not global_config.lpmm_knowledge.enable:
print("\n[!] 警告LPMM 知识库在配置中未启用。")
return
try:
lpmm_start_up()
except Exception as e:
print(f"\n[!] LPMM 初始化失败: {e}")
logger.error(f"LPMM 初始化失败: {e}", exc_info=True)
return
query = input("请输入查询问题或关键词: ").strip()
if not query:
print("\n[!] 查询内容为空,操作已取消。")
return
# 询问返回条数
print("-" * 40)
limit_str = input("希望返回的相关知识条数默认3直接回车使用默认值: ").strip()
try:
limit = int(limit_str) if limit_str else 3
limit = max(1, min(limit, 20)) # 限制在1-20之间
except ValueError:
limit = 3
print("[!] 输入无效,使用默认值 3。")
print("\n[进度] 正在查询知识库...")
try:
result = await query_lpmm_knowledge(query, limit=limit)
print("\n" + "=" * 60)
print("[查询结果]")
print("=" * 60)
print(result)
print("=" * 60)
except Exception as e:
print(f"\n[×] 查询失败: {e}")
logger.error(f"查询异常: {e}", exc_info=True)
async def main():
"""主循环"""
while True:
print("\n" + "" + "" * 38 + "")
print("║ LPMM 知识库交互管理工具 ║")
print("" + "" * 38 + "")
print("║ 1. 导入知识 (Add Content) ║")
print("║ 2. 删除知识 (Delete Content) ║")
print("║ 3. 查询知识 (Search Content) ║")
print("║ 4. 清空知识库 (Clear All) ⚠️ ║")
print("║ 0. 退出 (Exit) ║")
print("" + "" * 38 + "")
choice = input("请选择操作编号: ").strip()
if choice == "1":
await interactive_add()
elif choice == "2":
await interactive_delete()
elif choice == "3":
await interactive_search()
elif choice == "4":
await interactive_clear()
elif choice in ("0", "q", "Q", "quit", "exit"):
print("\n已退出工具。")
break
else:
print("\n[!] 无效的选择,请输入 0, 1, 2, 3 或 4。")
if __name__ == "__main__":
try:
# 运行主循环
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n[!] 用户中断程序 (Ctrl+C)。")
except Exception as e:
print(f"\n[!] 程序运行出错: {e}")
logger.error(f"Main loop 异常: {e}", exc_info=True)

View File

@ -1,7 +1,7 @@
import asyncio
import json
import time
from typing import List, Union
from typing import List, Union, Dict, Any
from .global_logger import logger
from . import prompt_template
@ -173,3 +173,50 @@ def info_extract_from_str(
return None, None
return entity_extract_result, rdf_triple_extract_result
class IEProcess:
"""
信息抽取处理器类提供更方便的批次处理接口
"""
def __init__(self, llm_ner: LLMRequest, llm_rdf: LLMRequest = None):
self.llm_ner = llm_ner
self.llm_rdf = llm_rdf or llm_ner
async def process_paragraphs(self, paragraphs: List[str]) -> List[dict]:
"""
异步处理多个段落
"""
from .utils.hash import get_sha256
results = []
total = len(paragraphs)
for i, pg in enumerate(paragraphs, start=1):
# 打印进度日志,让用户知道没有卡死
logger.info(f"[IEProcess] 正在处理第 {i}/{total} 段文本 (长度: {len(pg)})...")
# 使用 asyncio.to_thread 包装同步阻塞调用,防止死锁
# 这样 info_extract_from_str 内部的 asyncio.run 会在独立线程的新 loop 中运行
try:
entities, triples = await asyncio.to_thread(
info_extract_from_str, self.llm_ner, self.llm_rdf, pg
)
if entities is not None:
results.append(
{
"idx": get_sha256(pg),
"passage": pg,
"extracted_entities": entities,
"extracted_triples": triples,
}
)
logger.info(f"[IEProcess] 第 {i}/{total} 段处理完成,提取到 {len(entities)} 个实体")
else:
logger.warning(f"[IEProcess] 第 {i}/{total} 段提取失败(返回为空)")
except Exception as e:
logger.error(f"[IEProcess] 处理第 {i}/{total} 段时发生异常: {e}")
return results

View File

@ -0,0 +1,331 @@
import asyncio
from typing import List, Callable, Any
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.chat.knowledge.qa_manager import QAManager
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import get_qa_manager, lpmm_start_up
logger = get_logger("LPMM-Plugin-API")
class LPMMOperations:
"""
LPMM 内部操作接口
封装了 LPMM 的核心操作供插件系统 API 或其他内部组件调用
"""
def __init__(self):
self._initialized = False
async def _run_cancellable_executor(
self, func: Callable, *args, **kwargs
) -> Any:
"""
在线程池中执行可取消的同步操作
当任务被取消时 Ctrl+C会立即响应并抛出 CancelledError
注意线程池中的操作可能仍在运行但协程会立即返回不会阻塞主进程
Args:
func: 要执行的同步函数
*args: 函数的位置参数
**kwargs: 函数的关键字参数
Returns:
函数的返回值
Raises:
asyncio.CancelledError: 当任务被取消时
"""
loop = asyncio.get_event_loop()
# 在线程池中执行,当协程被取消时会立即响应
# 虽然线程池中的操作可能仍在运行,但协程不会阻塞
return await loop.run_in_executor(None, func, *args, **kwargs)
async def _get_managers(self) -> tuple[EmbeddingManager, KGManager, QAManager]:
"""获取并确保 LPMM 管理器已初始化"""
qa_mgr = get_qa_manager()
if qa_mgr is None:
# 如果全局没初始化,尝试初始化
if not global_config.lpmm_knowledge.enable:
logger.warning("LPMM 知识库在全局配置中未启用,操作可能受限。")
lpmm_start_up()
qa_mgr = get_qa_manager()
if qa_mgr is None:
raise RuntimeError("无法获取 LPMM QAManager请检查 LPMM 是否已正确安装和配置。")
return qa_mgr.embed_manager, qa_mgr.kg_manager, qa_mgr
async def add_content(self, text: str) -> dict:
"""
向知识库添加新内容
Args:
text: 原始文本支持多段文本用双换行分隔
Returns:
dict: {"status": "success/error", "count": 导入段落数, "message": "描述"}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 1. 分段处理
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
if not paragraphs:
return {"status": "error", "message": "文本内容为空"}
# 2. 实体与三元组抽取 (内部调用大模型)
from src.chat.knowledge.ie_process import IEProcess
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
llm_ner = LLMRequest(model_set=model_config.model_task_config.lpmm_entity_extract, request_type="lpmm.entity_extract")
llm_rdf = LLMRequest(model_set=model_config.model_task_config.lpmm_rdf_build, request_type="lpmm.rdf_build")
ie_process = IEProcess(llm_ner, llm_rdf)
logger.info(f"[Plugin API] 正在对 {len(paragraphs)} 段文本执行信息抽取...")
extracted_docs = await ie_process.process_paragraphs(paragraphs)
# 3. 构造并导入数据
# 这里我们手动实现导入逻辑,不依赖外部脚本
# a. 准备段落
raw_paragraphs = {doc["idx"]: doc["passage"] for doc in extracted_docs}
# b. 准备三元组
triple_list_data = {doc["idx"]: doc["extracted_triples"] for doc in extracted_docs}
# 向量化并入库
# 注意:此处模仿 import_openie.py 的核心逻辑
# 1. 先进行去重检查,只处理新段落
# store_new_data_set 期望的格式raw_paragraphs 的键是段落hash不带前缀值是段落文本
new_raw_paragraphs = {}
new_triple_list_data = {}
for pg_hash, passage in raw_paragraphs.items():
key = f"paragraph-{pg_hash}"
if key not in embed_mgr.stored_pg_hashes:
new_raw_paragraphs[pg_hash] = passage
new_triple_list_data[pg_hash] = triple_list_data[pg_hash]
if not new_raw_paragraphs:
return {"status": "success", "count": 0, "message": "内容已存在,无需重复导入"}
# 2. 使用 EmbeddingManager 的标准方法存储段落、实体和关系的嵌入
# store_new_data_set 会自动处理嵌入生成和存储
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
await self._run_cancellable_executor(
embed_mgr.store_new_data_set,
new_raw_paragraphs,
new_triple_list_data
)
# 3. 构建知识图谱只需要三元组数据和embedding_manager
await self._run_cancellable_executor(
kg_mgr.build_kg,
new_triple_list_data,
embed_mgr
)
# 4. 持久化
await self._run_cancellable_executor(embed_mgr.rebuild_faiss_index)
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
return {"status": "success", "count": len(new_raw_paragraphs), "message": f"成功导入 {len(new_raw_paragraphs)} 条知识"}
except asyncio.CancelledError:
logger.warning("[Plugin API] 导入操作被用户中断")
return {"status": "cancelled", "message": "导入操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 导入知识失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
async def search(self, query: str, top_k: int = 3) -> List[str]:
"""
检索知识库
Args:
query: 查询问题
top_k: 返回最相关的条目数
Returns:
List[str]: 相关文段列表
"""
try:
_, _, qa_mgr = await self._get_managers()
# 直接调用 QAManager 的检索接口
knowledge = qa_mgr.get_knowledge(query, top_k=top_k)
# 返回通常是拼接好的字符串,这里我们可以尝试按其内部规则切分回列表,或者直接返回
return [knowledge] if knowledge else []
except Exception as e:
logger.error(f"[Plugin API] 检索知识失败: {e}")
return []
async def delete(self, keyword: str, exact_match: bool = False) -> dict:
"""
根据关键词或完整文段删除知识库内容
Args:
keyword: 匹配关键词或完整文段
exact_match: 是否使用完整文段匹配True=完全匹配False=关键词模糊匹配
Returns:
dict: {"status": "success/info", "deleted_count": 删除条数, "message": "描述"}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 1. 查找匹配的段落
to_delete_keys = []
to_delete_hashes = []
for key, item in embed_mgr.paragraphs_embedding_store.store.items():
if exact_match:
# 完整文段匹配
if item.str.strip() == keyword.strip():
to_delete_keys.append(key)
to_delete_hashes.append(key.replace("paragraph-", "", 1))
else:
# 关键词模糊匹配
if keyword in item.str:
to_delete_keys.append(key)
to_delete_hashes.append(key.replace("paragraph-", "", 1))
if not to_delete_keys:
match_type = "完整文段" if exact_match else "关键词"
return {"status": "info", "deleted_count": 0, "message": f"未找到匹配的内容({match_type}匹配)"}
# 2. 执行删除
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
# a. 从向量库删除
deleted_count, _ = await self._run_cancellable_executor(
embed_mgr.paragraphs_embedding_store.delete_items,
to_delete_keys
)
embed_mgr.stored_pg_hashes = set(embed_mgr.paragraphs_embedding_store.store.keys())
# b. 从知识图谱删除
await self._run_cancellable_executor(
kg_mgr.delete_paragraphs,
to_delete_hashes,
True # remove_orphan_entities
)
# 3. 持久化
await self._run_cancellable_executor(embed_mgr.rebuild_faiss_index)
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
match_type = "完整文段" if exact_match else "关键词"
return {"status": "success", "deleted_count": deleted_count, "message": f"已成功删除 {deleted_count} 条相关知识({match_type}匹配)"}
except asyncio.CancelledError:
logger.warning("[Plugin API] 删除操作被用户中断")
return {"status": "cancelled", "message": "删除操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 删除知识失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
async def clear_all(self) -> dict:
"""
清空整个LPMM知识库删除所有段落实体关系和知识图谱数据
Returns:
dict: {"status": "success/error", "message": "描述", "stats": {...}}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 记录清空前的统计信息
before_stats = {
"paragraphs": len(embed_mgr.paragraphs_embedding_store.store),
"entities": len(embed_mgr.entities_embedding_store.store),
"relations": len(embed_mgr.relation_embedding_store.store),
"kg_nodes": len(kg_mgr.graph.get_node_list()),
"kg_edges": len(kg_mgr.graph.get_edge_list()),
}
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
# 1. 清空所有向量库
# 获取所有keys
para_keys = list(embed_mgr.paragraphs_embedding_store.store.keys())
ent_keys = list(embed_mgr.entities_embedding_store.store.keys())
rel_keys = list(embed_mgr.relation_embedding_store.store.keys())
# 删除所有段落向量
para_deleted, _ = await self._run_cancellable_executor(
embed_mgr.paragraphs_embedding_store.delete_items,
para_keys
)
embed_mgr.stored_pg_hashes.clear()
# 删除所有实体向量
if ent_keys:
ent_deleted, _ = await self._run_cancellable_executor(
embed_mgr.entities_embedding_store.delete_items,
ent_keys
)
else:
ent_deleted = 0
# 删除所有关系向量
if rel_keys:
rel_deleted, _ = await self._run_cancellable_executor(
embed_mgr.relation_embedding_store.delete_items,
rel_keys
)
else:
rel_deleted = 0
# 2. 清空知识图谱
# 获取所有段落hash
all_pg_hashes = list(kg_mgr.stored_paragraph_hashes)
if all_pg_hashes:
# 删除所有段落节点(这会自动清理相关的边和孤立实体)
await self._run_cancellable_executor(
kg_mgr.delete_paragraphs,
all_pg_hashes,
True # remove_orphan_entities
)
# 完全清空KG创建新的空图
from quick_algo import di_graph
kg_mgr.graph = di_graph.DiGraph()
kg_mgr.stored_paragraph_hashes.clear()
kg_mgr.ent_appear_cnt.clear()
# 3. 重建索引并保存
await self._run_cancellable_executor(embed_mgr.rebuild_faiss_index)
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
after_stats = {
"paragraphs": len(embed_mgr.paragraphs_embedding_store.store),
"entities": len(embed_mgr.entities_embedding_store.store),
"relations": len(embed_mgr.relation_embedding_store.store),
"kg_nodes": len(kg_mgr.graph.get_node_list()),
"kg_edges": len(kg_mgr.graph.get_edge_list()),
}
return {
"status": "success",
"message": f"已成功清空LPMM知识库删除 {para_deleted} 个段落、{ent_deleted} 个实体、{rel_deleted} 个关系)",
"stats": {
"before": before_stats,
"after": after_stats,
}
}
except asyncio.CancelledError:
logger.warning("[Plugin API] 清空操作被用户中断")
return {"status": "cancelled", "message": "清空操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 清空知识库失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
# 内部使用的单例
lpmm_ops = LPMMOperations()

View File

@ -4,6 +4,7 @@ import traceback
import random
import re
from typing import Dict, Optional, Tuple, List, TYPE_CHECKING, Union
from collections import OrderedDict
from rich.traceback import install
from datetime import datetime
from json_repair import repair_json
@ -110,6 +111,10 @@ class ActionPlanner:
self.last_obs_time_mark = 0.0
self.plan_log: List[Tuple[str, float, Union[List[ActionPlannerInfo], str]]] = []
# 黑话缓存:使用 OrderedDict 实现 LRU最多缓存10个
self.unknown_words_cache: OrderedDict[str, None] = OrderedDict()
self.unknown_words_cache_limit = 10
def find_message_by_id(
self, message_id: str, message_id_list: List[Tuple[str, "DatabaseMessages"]]
@ -299,6 +304,136 @@ class ActionPlanner:
logger.warning(f"{self.log_prefix}检测消息发送者失败,缺少必要字段")
return False
def _update_unknown_words_cache(self, new_words: List[str]) -> None:
"""
更新黑话缓存将新的黑话加入缓存
Args:
new_words: 新提取的黑话列表
"""
for word in new_words:
if not isinstance(word, str):
continue
word = word.strip()
if not word:
continue
# 如果已存在移到末尾LRU
if word in self.unknown_words_cache:
self.unknown_words_cache.move_to_end(word)
else:
# 添加新词
self.unknown_words_cache[word] = None
# 如果超过限制,移除最老的
if len(self.unknown_words_cache) > self.unknown_words_cache_limit:
self.unknown_words_cache.popitem(last=False)
logger.debug(f"{self.log_prefix}黑话缓存已满,移除最老的黑话")
def _merge_unknown_words_with_cache(self, new_words: Optional[List[str]]) -> List[str]:
"""
合并新提取的黑话和缓存中的黑话
Args:
new_words: 新提取的黑话列表可能为None
Returns:
合并后的黑话列表去重
"""
# 清理新提取的黑话
cleaned_new_words: List[str] = []
if new_words:
for word in new_words:
if isinstance(word, str):
word = word.strip()
if word:
cleaned_new_words.append(word)
# 获取缓存中的黑话列表
cached_words = list(self.unknown_words_cache.keys())
# 合并并去重(保留顺序:新提取的在前,缓存的在后)
merged_words: List[str] = []
seen = set()
# 先添加新提取的
for word in cleaned_new_words:
if word not in seen:
merged_words.append(word)
seen.add(word)
# 再添加缓存的(如果不在新提取的列表中)
for word in cached_words:
if word not in seen:
merged_words.append(word)
seen.add(word)
return merged_words
def _process_unknown_words_cache(
self, actions: List[ActionPlannerInfo]
) -> None:
"""
处理黑话缓存逻辑
1. 检查是否有 reply action 提取了 unknown_words
2. 如果没有提取移除最老的1个
3. 如果缓存数量大于5移除最老的2个
4. 对于每个 reply action合并缓存和新提取的黑话
5. 更新缓存
Args:
actions: 解析后的动作列表
"""
# 先检查缓存数量如果大于5移除最老的2个
if len(self.unknown_words_cache) > 5:
# 移除最老的2个
removed_count = 0
for _ in range(2):
if len(self.unknown_words_cache) > 0:
self.unknown_words_cache.popitem(last=False)
removed_count += 1
if removed_count > 0:
logger.debug(f"{self.log_prefix}缓存数量大于5移除最老的{removed_count}个缓存")
# 检查是否有 reply action 提取了 unknown_words
has_extracted_unknown_words = False
for action in actions:
if action.action_type == "reply":
action_data = action.action_data or {}
unknown_words = action_data.get("unknown_words")
if unknown_words and isinstance(unknown_words, list) and len(unknown_words) > 0:
has_extracted_unknown_words = True
break
# 如果当前 plan 的 reply 没有提取移除最老的1个
if not has_extracted_unknown_words:
if len(self.unknown_words_cache) > 0:
self.unknown_words_cache.popitem(last=False)
logger.debug(f"{self.log_prefix}当前 plan 的 reply 没有提取黑话移除最老的1个缓存")
# 对于每个 reply action合并缓存和新提取的黑话
for action in actions:
if action.action_type == "reply":
action_data = action.action_data or {}
new_words = action_data.get("unknown_words")
# 合并新提取的和缓存的黑话列表
merged_words = self._merge_unknown_words_with_cache(new_words)
# 更新 action_data
if merged_words:
action_data["unknown_words"] = merged_words
logger.debug(
f"{self.log_prefix}合并黑话:新提取 {len(new_words) if new_words else 0} 个,"
f"缓存 {len(self.unknown_words_cache)} 个,合并后 {len(merged_words)}"
)
else:
# 如果没有合并后的黑话,移除 unknown_words 字段
action_data.pop("unknown_words", None)
# 更新缓存(将新提取的黑话加入缓存)
if new_words:
self._update_unknown_words_cache(new_words)
async def plan(
self,
available_actions: Dict[str, ActionInfo],
@ -722,6 +857,9 @@ class ActionPlanner:
random.shuffle(shuffled)
actions = list({a.action_type: a for a in shuffled}.values())
# 处理黑话缓存逻辑
self._process_unknown_words_cache(actions)
logger.debug(f"{self.log_prefix}规划器选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}")
return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms

View File

@ -368,7 +368,7 @@ class ChatHistory(BaseModel):
theme = TextField() # 主题:这段对话的主要内容,一个简短的标题
keywords = TextField() # 关键词这段对话的关键词JSON格式存储
summary = TextField() # 概括:对这段话的平文本概括
key_point = TextField(null=True) # 关键信息话题中的关键信息点JSON格式存储
# key_point = TextField(null=True) # 关键信息话题中的关键信息点JSON格式存储
count = IntegerField(default=0) # 被检索次数
forget_times = IntegerField(default=0) # 被遗忘检查的次数

View File

@ -697,6 +697,9 @@ class WebUIConfig(ConfigBase):
secure_cookie: bool = False
"""是否启用安全Cookie仅通过HTTPS传输默认false"""
enable_paragraph_content: bool = False
"""是否在知识图谱中加载段落完整内容需要加载embedding store会占用额外内存"""
@dataclass
class DebugConfig(ConfigBase):

View File

@ -192,7 +192,6 @@ def init_dream_tools(chat_id: str) -> None:
("theme", ToolParamType.STRING, "新的主题标题,如果不需要修改可不填。", False, None),
("summary", ToolParamType.STRING, "新的概括内容,如果不需要修改可不填。", False, None),
("keywords", ToolParamType.STRING, "新的关键词 JSON 字符串,如 ['关键词1','关键词2']。", False, None),
("key_point", ToolParamType.STRING, "新的关键信息 JSON 字符串,如 ['要点1','要点2']。", False, None),
],
update_chat_history,
)
@ -201,7 +200,7 @@ def init_dream_tools(chat_id: str) -> None:
_dream_tool_registry.register_tool(
DreamTool(
"create_chat_history",
"根据整理后的理解创建一条新的 ChatHistory 概括记录(主题、概括、关键词、关键信息等)。",
"根据整理后的理解创建一条新的 ChatHistory 概括记录(主题、概括、关键词等)。",
[
("theme", ToolParamType.STRING, "新的主题标题(必填)。", True, None),
("summary", ToolParamType.STRING, "新的概括内容(必填)。", True, None),
@ -212,10 +211,11 @@ def init_dream_tools(chat_id: str) -> None:
True,
None,
),
("original_text", ToolParamType.STRING, "对话原文内容(必填)。", True, None),
(
"key_point",
"participants",
ToolParamType.STRING,
"新的关键信息 JSON 字符串,如 ['要点1','要点2'](必填)。",
"参与人的 JSON 字符串,如 ['用户1','用户2'](必填)。",
True,
None,
),
@ -313,8 +313,7 @@ async def run_dream_agent_once(
f"主题={record.theme or ''}\n"
f"关键词={record.keywords or ''}\n"
f"参与者={record.participants or ''}\n"
f"概括={record.summary or ''}\n"
f"关键信息={record.key_point or ''}"
f"概括={record.summary or ''}"
)
logger.debug(

View File

@ -11,7 +11,8 @@ def make_create_chat_history(chat_id: str):
theme: str,
summary: str,
keywords: str,
key_point: str,
original_text: str,
participants: str,
start_time: float,
end_time: float,
) -> str:
@ -20,7 +21,8 @@ def make_create_chat_history(chat_id: str):
logger.info(
f"[dream][tool] 调用 create_chat_history("
f"theme={bool(theme)}, summary={bool(summary)}, "
f"keywords={bool(keywords)}, key_point={bool(key_point)}, "
f"keywords={bool(keywords)}, original_text={bool(original_text)}, "
f"participants={bool(participants)}, "
f"start_time={start_time}, end_time={end_time}) (chat_id={chat_id})"
)
@ -43,7 +45,8 @@ def make_create_chat_history(chat_id: str):
theme=theme,
summary=summary,
keywords=keywords,
key_point=key_point,
original_text=original_text,
participants=participants,
# 对于由 dream 整理产生的新概括,时间范围优先使用工具提供的时间,否则使用当前时间占位
start_time=start_ts,
end_time=end_ts,

View File

@ -32,8 +32,7 @@ def make_get_chat_history_detail(chat_id: str): # chat_id 目前未直接使用
f"主题={record.theme or ''}\n"
f"关键词={record.keywords or ''}\n"
f"参与者={record.participants or ''}\n"
f"概括={record.summary or ''}\n"
f"关键信息={record.key_point or ''}"
f"概括={record.summary or ''}"
)
logger.debug(f"[dream][tool] get_chat_history_detail 成功,预览: {result[:200].replace(chr(10), ' ')}")
return result

View File

@ -13,13 +13,12 @@ def make_update_chat_history(chat_id: str): # chat_id 目前未直接使用,
theme: Optional[str] = None,
summary: Optional[str] = None,
keywords: Optional[str] = None,
key_point: Optional[str] = None,
) -> str:
"""按字段更新 chat_history字符串字段要求 JSON 的字段须传入已序列化的字符串)"""
try:
logger.info(
f"[dream][tool] 调用 update_chat_history(memory_id={memory_id}, "
f"theme={bool(theme)}, summary={bool(summary)}, keywords={bool(keywords)}, key_point={bool(key_point)})"
f"theme={bool(theme)}, summary={bool(summary)}, keywords={bool(keywords)})"
)
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
@ -34,8 +33,6 @@ def make_update_chat_history(chat_id: str): # chat_id 目前未直接使用,
data["summary"] = summary
if keywords is not None:
data["keywords"] = keywords
if key_point is not None:
data["key_point"] = key_point
if not data:
return "未提供任何需要更新的字段。"

View File

@ -71,16 +71,14 @@ def init_prompt():
1. 关键词提取与话题相关的关键词用列表形式返回3-10个关键词
2. 概括对这段话的平文本概括50-200要求
- 仔细地转述发生的事件和聊天内容
- 可以适当摘取聊天记录中的原文
- 重点突出事件的发展过程和结果
- 围绕话题这个中心进行概括
3. 关键信息提取话题中的关键信息点用列表形式返回3-8个关键信息点每个关键信息点应该简洁明了
- 提取话题中的关键信息点关键信息点应该简洁明了
请以JSON格式返回格式如下
{{
"keywords": ["关键词1", "关键词2", ...],
"summary": "概括内容",
"key_point": ["关键信息1", "关键信息2", ...]
"summary": "概括内容"
}}
聊天记录
@ -815,12 +813,38 @@ class ChatHistorySummarizer:
original_text = "\n".join(item.messages)
logger.info(
f"{self.log_prefix} 开始打包话题[{topic}] | 消息数: {len(item.messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}"
f"{self.log_prefix} 开始将聊天记录构建成记忆:[{topic}] | 消息数: {len(item.messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}"
)
# 使用 LLM 进行总结(基于话题名)
success, keywords, summary, key_point = await self._compress_with_llm(original_text, topic)
if not success:
# 使用 LLM 进行总结(基于话题名),带重试机制
max_retries = 3
attempt = 0
success = False
keywords = []
summary = ""
while attempt < max_retries:
attempt += 1
success, keywords, summary = await self._compress_with_llm(original_text, topic)
if success and keywords and summary:
# 成功获取到有效的 keywords 和 summary
if attempt > 1:
logger.info(
f"{self.log_prefix} 话题[{topic}] LLM 概括在第 {attempt} 次重试后成功"
)
break
if attempt < max_retries:
logger.warning(
f"{self.log_prefix} 话题[{topic}] LLM 概括失败(第 {attempt} 次尝试),准备重试"
)
else:
logger.error(
f"{self.log_prefix} 话题[{topic}] LLM 概括连续 {max_retries} 次失败,放弃存储"
)
if not success or not keywords or not summary:
logger.warning(f"{self.log_prefix} 话题[{topic}] LLM 概括失败,不写入数据库")
return
@ -834,14 +858,13 @@ class ChatHistorySummarizer:
theme=topic, # 主题直接使用话题名
keywords=keywords,
summary=summary,
key_point=key_point,
)
logger.info(
f"{self.log_prefix} 话题[{topic}] 成功打包并存储 | 消息数: {len(item.messages)} | 参与者数: {len(participants)}"
)
async def _compress_with_llm(self, original_text: str, topic: str) -> tuple[bool, List[str], str, List[str]]:
async def _compress_with_llm(self, original_text: str, topic: str) -> tuple[bool, List[str], str]:
"""
使用LLM压缩聊天内容用于单个话题的最终总结
@ -850,7 +873,7 @@ class ChatHistorySummarizer:
topic: 话题名称
Returns:
tuple[bool, List[str], str, List[str]]: (是否成功, 关键词列表, 概括, 关键信息列表)
tuple[bool, List[str], str]: (是否成功, 关键词列表, 概括)
"""
prompt = await global_prompt_manager.format_prompt(
"hippo_topic_summary_prompt",
@ -920,24 +943,24 @@ class ChatHistorySummarizer:
keywords = result.get("keywords", [])
summary = result.get("summary", "")
key_point = result.get("key_point", [])
if not (keywords and summary) and key_point:
logger.warning(f"{self.log_prefix} LLM返回的JSON中缺少字段原文\n{response}")
# 检查必需字段是否为空
if not keywords or not summary:
logger.warning(f"{self.log_prefix} LLM返回的JSON中缺少必需字段原文\n{response}")
# 返回失败,和模型出错一样,让上层进行重试
return False, [], ""
# 确保keywords和key_point是列表
# 确保keywords是列表
if isinstance(keywords, str):
keywords = [keywords]
if isinstance(key_point, str):
key_point = [key_point]
return True, keywords, summary, key_point
return True, keywords, summary
except Exception as e:
logger.error(f"{self.log_prefix} LLM压缩聊天内容时出错: {e}")
logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}")
# 返回失败标志和默认值
return False, [], "压缩失败,无法生成概括", []
return False, [], "压缩失败,无法生成概括"
async def _store_to_database(
self,
@ -948,7 +971,6 @@ class ChatHistorySummarizer:
theme: str,
keywords: List[str],
summary: str,
key_point: Optional[List[str]] = None,
):
"""存储到数据库"""
try:
@ -968,10 +990,6 @@ class ChatHistorySummarizer:
"count": 0,
}
# 存储 key_point如果存在
if key_point is not None:
data["key_point"] = json.dumps(key_point, ensure_ascii=False)
# 使用db_save存储使用start_time和chat_id作为唯一标识
# 由于可能有多条记录我们使用组合键但peewee不支持所以使用start_time作为唯一标识
# 但为了避免冲突我们使用组合键chat_id + start_time
@ -991,7 +1009,6 @@ class ChatHistorySummarizer:
await self._import_to_lpmm_knowledge(
theme=theme,
summary=summary,
key_point=key_point,
participants=participants,
original_text=original_text,
)
@ -1007,7 +1024,6 @@ class ChatHistorySummarizer:
self,
theme: str,
summary: str,
key_point: Optional[List[str]],
participants: List[str],
original_text: str,
):
@ -1017,7 +1033,6 @@ class ChatHistorySummarizer:
Args:
theme: 话题主题
summary: 概括内容
key_point: 关键信息点列表
participants: 参与者列表
original_text: 原始文本可能很长需要截断
"""
@ -1025,7 +1040,8 @@ class ChatHistorySummarizer:
from src.chat.knowledge.lpmm_ops import lpmm_ops
# 构造要导入的文本内容
# 格式:主题 + 概括 + 关键信息点 + 参与者信息
# 格式:主题 + 概括 + 参与者信息 + 原始内容摘要
# 注意使用单换行符连接确保整个内容作为一段导入不被LPMM分段
content_parts = []
# 1. 话题主题
@ -1036,17 +1052,12 @@ class ChatHistorySummarizer:
if summary:
content_parts.append(f"概括:{summary}")
# 3. 关键信息点
if key_point:
key_points_text = "".join(key_point)
content_parts.append(f"关键信息:{key_points_text}")
# 4. 参与者信息
# 3. 参与者信息
if participants:
participants_text = "".join(participants)
content_parts.append(f"参与者:{participants_text}")
# 5. 原始文本摘要如果原始文本太长只取前500字
# 4. 原始文本摘要如果原始文本太长只取前500字
if original_text:
# 截断原始文本,避免过长
max_original_length = 500
@ -1056,8 +1067,9 @@ class ChatHistorySummarizer:
else:
content_parts.append(f"原始内容:{original_text}")
# 将所有部分合并为一个段落用双换行分隔符合lpmm_ops.add_content的格式要求
content_to_import = "\n\n".join(content_parts)
# 将所有部分合并为一个完整段落使用单换行符避免被LPMM分段
# LPMM使用 \n\n 作为段落分隔符,所以这里使用 \n 确保不会被分段
content_to_import = "\n".join(content_parts)
if not content_to_import.strip():
logger.warning(f"{self.log_prefix} 聊天历史总结内容为空,跳过导入知识库")

View File

@ -463,18 +463,6 @@ async def get_chat_history_detail(chat_id: str, memory_ids: str) -> str:
if record.summary:
result_parts.append(f"概括:{record.summary}")
# 添加关键信息点
if record.key_point:
try:
key_point_data = (
json.loads(record.key_point) if isinstance(record.key_point, str) else record.key_point
)
if isinstance(key_point_data, list) and key_point_data:
key_point_str = "\n".join([f" - {str(kp)}" for kp in key_point_data])
result_parts.append(f"关键信息点:\n{key_point_str}")
except (json.JSONDecodeError, TypeError, ValueError):
pass
results.append("\n".join(result_parts))
if not results:

View File

@ -5,11 +5,83 @@ from fastapi import APIRouter, Query, Depends, Cookie, Header
from pydantic import BaseModel
import logging
from src.webui.auth import verify_auth_token_from_cookie_or_header
from src.config.config import global_config
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/webui/knowledge", tags=["knowledge"])
# 延迟初始化的轻量级 embedding store只读仅用于获取段落完整文本
_paragraph_store_cache = None
def _get_paragraph_store():
"""延迟加载段落 embedding store只读模式轻量级
Returns:
EmbeddingStore | None: 如果配置启用则返回store否则返回None
"""
# 检查配置是否启用
if not global_config.webui.enable_paragraph_content:
return None
global _paragraph_store_cache
if _paragraph_store_cache is not None:
return _paragraph_store_cache
try:
from src.chat.knowledge.embedding_store import EmbeddingStore
import os
# 获取数据路径
current_dir = os.path.dirname(os.path.abspath(__file__))
root_path = os.path.abspath(os.path.join(current_dir, "..", ".."))
embedding_dir = os.path.join(root_path, "data/embedding")
# 只加载段落 embedding store轻量级
paragraph_store = EmbeddingStore(
namespace="paragraph",
dir_path=embedding_dir,
max_workers=1, # 只读不需要多线程
chunk_size=100
)
paragraph_store.load_from_file()
_paragraph_store_cache = paragraph_store
logger.info(f"成功加载段落 embedding store包含 {len(paragraph_store.store)} 个段落")
return paragraph_store
except Exception as e:
logger.warning(f"加载段落 embedding store 失败: {e}")
return None
def _get_paragraph_content(node_id: str) -> tuple[Optional[str], bool]:
"""从 embedding store 获取段落完整内容
Args:
node_id: 段落节点ID格式为 'paragraph-{hash}'
Returns:
tuple[str | None, bool]: (段落完整内容或None, 是否启用了功能)
"""
try:
paragraph_store = _get_paragraph_store()
if paragraph_store is None:
# 功能未启用
return None, False
# 从 store 中获取完整内容
paragraph_item = paragraph_store.store.get(node_id)
if paragraph_item is not None:
# paragraph_item 是 EmbeddingStoreItem其 str 属性包含完整文本
content: str = getattr(paragraph_item, 'str', '')
if content:
return content, True
return None, True
except Exception as e:
logger.debug(f"获取段落内容失败: {e}")
return None, True
def require_auth(
maibot_session: Optional[str] = Cookie(None),
@ -84,7 +156,14 @@ def _convert_graph_to_json(kg_manager) -> KnowledgeGraph:
node_data = graph[node_id]
# 节点类型: "ent" -> "entity", "pg" -> "paragraph"
node_type = "entity" if ("type" in node_data and node_data["type"] == "ent") else "paragraph"
content = node_data["content"] if "content" in node_data else node_id
# 对于段落节点,尝试从 embedding store 获取完整内容
if node_type == "paragraph":
full_content, _ = _get_paragraph_content(node_id)
content = full_content if full_content is not None else (node_data["content"] if "content" in node_data else node_id)
else:
content = node_data["content"] if "content" in node_data else node_id
create_time = node_data["create_time"] if "create_time" in node_data else None
nodes.append(KnowledgeNode(id=node_id, type=node_type, content=content, create_time=create_time))
@ -166,7 +245,14 @@ async def get_knowledge_graph(
try:
node_data = graph[node_id]
node_type_val = "entity" if ("type" in node_data and node_data["type"] == "ent") else "paragraph"
content = node_data["content"] if "content" in node_data else node_id
# 对于段落节点,尝试从 embedding store 获取完整内容
if node_type_val == "paragraph":
full_content, _ = _get_paragraph_content(node_id)
content = full_content if full_content is not None else (node_data["content"] if "content" in node_data else node_id)
else:
content = node_data["content"] if "content" in node_data else node_id
create_time = node_data["create_time"] if "create_time" in node_data else None
nodes.append(KnowledgeNode(id=node_id, type=node_type_val, content=content, create_time=create_time))
@ -281,8 +367,14 @@ async def search_knowledge_node(query: str = Query(..., min_length=1), _auth: bo
for node_id in node_list:
try:
node_data = graph[node_id]
content = node_data["content"] if "content" in node_data else node_id
node_type = "entity" if ("type" in node_data and node_data["type"] == "ent") else "paragraph"
# 对于段落节点,尝试从 embedding store 获取完整内容
if node_type == "paragraph":
full_content, _ = _get_paragraph_content(node_id)
content = full_content if full_content is not None else (node_data["content"] if "content" in node_data else node_id)
else:
content = node_data["content"] if "content" in node_data else node_id
if query_lower in content.lower() or query_lower in node_id.lower():
create_time = node_data["create_time"] if "create_time" in node_data else None

View File

@ -1,5 +1,5 @@
[inner]
version = "7.4.0"
version = "7.4.1"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
# 如果你想要修改配置文件请递增version的值
@ -294,6 +294,7 @@ trusted_proxies = "" # 信任的代理IP列表逗号分隔只有来自
trust_xff = false # 是否启用X-Forwarded-For代理解析默认false
# 启用后仍要求直连IP在trusted_proxies中才会信任XFF头
secure_cookie = false # 是否启用安全Cookie仅通过HTTPS传输默认false
enable_paragraph_content = false # 是否在知识图谱中加载段落完整内容需要加载embedding store会占用额外内存
[experimental] #实验性功能
# 麦麦私聊的说话规则,行为风格(实验性功能)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -11,7 +11,7 @@
<link rel="icon" type="image/x-icon" href="/maimai.ico" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>MaiBot Dashboard</title>
<script type="module" crossorigin src="/assets/index-DD4VGX3W.js"></script>
<script type="module" crossorigin src="/assets/index-CbKchl83.js"></script>
<link rel="modulepreload" crossorigin href="/assets/react-vendor-BmxF9s7Q.js">
<link rel="modulepreload" crossorigin href="/assets/router-9vIXuQkh.js">
<link rel="modulepreload" crossorigin href="/assets/utils-BqoaXoQ1.js">
@ -25,7 +25,7 @@
<link rel="modulepreload" crossorigin href="/assets/reactflow-DtsZHOR4.js">
<link rel="modulepreload" crossorigin href="/assets/uppy-DFP_VzYR.js">
<link rel="modulepreload" crossorigin href="/assets/markdown-CKA5gBQ9.js">
<link rel="stylesheet" crossorigin href="/assets/index-RB5cYCSR.css">
<link rel="stylesheet" crossorigin href="/assets/index-DkXVyv8m.css">
</head>
<body>
<div id="root" class="notranslate"></div>