feat:为 LPMM 流水线脚本添加非交互模式

为 info_extraction.py、import_openie.py、delete_lpmm_items.py 以及新增的 lpmm_manager.py 引入统一的 --non-interactive 参数,使其能够在 CI 和自动化场景下实现完全脚本化、无交互运行。新增了一个功能完整的命令行管理脚本(lpmm_manager.py)以及一份详细的用户指南(lpmm_pipelines_guide.md)。同时,更新了 test_lpmm_retrieval.py,以支持通过 CLI 自定义测试用例,并改进了整个流水线中的错误处理和用户提示。最后,从文档和代码中移除了 ppr_relation_cap 参数。
pull/1386/head
陈曦 2025-12-18 20:40:05 +08:00
parent 20c9cbad3e
commit f41c2113dc
9 changed files with 932 additions and 45 deletions

View File

@ -76,7 +76,6 @@ embedding_chunk_size = 16 # 每批嵌入的条数
info_extraction_workers = 3 # 实体抽取同时执行线程数
enable_ppr = true # 是否启用PPR低配机器可关闭
ppr_node_cap = 8000 # 图节点数超过该值时自动跳过PPR
ppr_relation_cap = 50 # 命中关系数超过该值时自动跳过PPR
```
- `embedding_dimension`
@ -97,13 +96,13 @@ ppr_relation_cap = 50 # 命中关系数超过该值时自动跳过PPR
- 使用 Pro/贵价模型时建议不要太大,避免并行费用过高;
- 一般 24 就能取得较好平衡。
- `enable_ppr`
- `enable_ppr`
是否启用个性化 PageRankPPR图检索
- `true`:检索会结合向量+知识图,效果更好,但略慢;
- `false`:只用向量检索,牺牲一定效果,性能更稳定。
- `ppr_node_cap` / `ppr_relation_cap`
安全阈值:当图节点数或命中关系数超过阈值时自动跳过 PPR以避免“大图”导致卡顿。
- `ppr_node_cap`
安全阈值:当图节点数超过阈值时自动跳过 PPR以避免“大图”导致卡顿。
> 调参建议:
> - 若导入/检索阶段机器明显“顶不住”(>=1MB的大文本且分配配置<4C

View File

@ -0,0 +1,326 @@
## LPMM 知识库流水线使用指南(命令行版)
本文档介绍如何使用 `scripts/lpmm_manager.py` 及相关子脚本,完成 **导入 / 删除 / 自检 / 刷新 / 回归测试** 等常见流水线操作,并说明各参数在交互式与非交互(脚本化)场景下的用法。
所有命令均假设在项目根目录 `MaiBot/` 下执行:
```bash
cd MaiBot
```
---
## 1. 管理脚本总览:`scripts/lpmm_manager.py`
### 1.1 基本用法
```bash
python scripts/lpmm_manager.py [--interactive] [-a ACTION] [--non-interactive] [-- ...子脚本参数...]
```
- `--interactive` / `-i`:进入交互式菜单模式(推荐人工运维时使用)。
- `--action` / `-a`:直接执行指定操作(非交互入口),可选值:
- `prepare_raw`:预处理 `data/lpmm_raw_data/*.txt`
- `info_extract`:信息抽取,生成 OpenIE JSON 批次。
- `import_openie`:导入 OpenIE 批次到向量库与知识图。
- `delete`:删除/回滚知识(封装 `delete_lpmm_items.py`)。
- `batch_inspect`:检查指定 OpenIE 批次的存在情况。
- `global_inspect`:全库状态统计。
- `refresh`:刷新 LPMM 磁盘数据到内存。
- `test`:检索效果回归测试。
- `full_import`:一键执行「预处理原始语料 → 信息抽取 → 导入 → 刷新」。
- `--non-interactive`
- 启用 **非交互模式**`lpmm_manager` 自身不会再调用 `input()` 询问确认;
- 同时自动向子脚本透传 `--non-interactive`(若子脚本支持),用于在 CI / 定时任务中实现无人值守。
- `--` 之后的内容会原样传递给对应子脚本的 `main()`,用于设置更细粒度参数。
> 注意:`--interactive` 与 `--non-interactive` 互斥,不能同时使用。
---
## 2. 典型流水线一:全量导入(从原始 txt 到可用 LPMM
### 2.1 前置条件
- 将待导入的原始文本放入:
```text
data/lpmm_raw_data/*.txt
```
- 文本按「空行分段」,每个段落为一条候选知识。
### 2.2 一键全流程(交互式)
```bash
python scripts/lpmm_manager.py --interactive
```
菜单中依次:
1. 选择 `9. full_import`(预处理 → 信息抽取 → 导入 → 刷新)。
2. 按提示确认可能的费用与时间消耗。
3. 等待脚本执行完成。
### 2.3 一键全流程(非交互 / CI 友好)
```bash
python scripts/lpmm_manager.py -a full_import --non-interactive
```
执行顺序:
1. `prepare_raw`:调用 `raw_data_preprocessor.load_raw_data()`,统计段落与去重哈希数。
2. `info_extract`:调用 `info_extraction.main(--non-interactive)`,从 `data/lpmm_raw_data` 读取段落,生成 OpenIE JSON 并写入 `data/openie/`
3. `import_openie`:调用 `import_openie.main(--non-interactive)`,导入 OpenIE 批次到嵌入库与 KG。
4. `refresh`:调用 `refresh_lpmm_knowledge.main()`,刷新 LPMM 知识库到内存。
`--non-interactive` 模式下:
- 若 `data/lpmm_raw_data` 中没有 `.txt` 文件,或 `data/openie` 中没有 `.json` 文件,将直接报错退出,并在日志中说明缺少的目录/文件。
- 若 OpenIE 批次中存在非法文段,导入脚本会 **直接报错退出**,不会卡在交互确认上。
---
## 3. 典型流水线二:分步导入
若需要逐步调试或只执行部分步骤,可以分开调用:
### 3.1 预处理原始语料:`prepare_raw`
```bash
python scripts/lpmm_manager.py -a prepare_raw
```
行为:
- 使用 `raw_data_preprocessor.load_raw_data()` 读取 `data/lpmm_raw_data/*.txt`
- 输出段落总数与去重后的哈希数,供人工检查原始数据质量。
### 3.2 信息抽取:`info_extract`
#### 交互式(带费用提示)
```bash
python scripts/lpmm_manager.py -a info_extract
```
脚本会:
- 打印预计费用/时间提示;
- 询问 `确认继续执行?(y/n)`
- 然后开始从 `data/lpmm_raw_data` 中读取段落,调用 LLM 提取实体与三元组,并生成 OpenIE JSON。
#### 非交互式(无人工确认)
```bash
python scripts/lpmm_manager.py -a info_extract --non-interactive
```
行为差异:
- 跳过`确认继续执行`的交互提示,直接开始抽取;
- 若 `data/lpmm_raw_data` 下没有 `.txt` 文件,会打印告警并以错误方式退出。
### 3.3 导入 OpenIE 批次:`import_openie`
#### 交互式
```bash
python scripts/lpmm_manager.py -a import_openie
```
脚本会:
- 提示导入开销与资源占用情况;
- 询问是否继续;
- 调用 `OpenIE.load()` 加载批次,再将其导入嵌入库与 KG。
#### 非交互式
```bash
python scripts/lpmm_manager.py -a import_openie --non-interactive
```
- 跳过导入开销确认;
- 若数据存在非法文段:
- 在交互模式下会询问是否删除这些非法文段并继续;
- 在非交互模式下,会直接 `logger.error``sys.exit(1)`,防止导入不完整数据。
> 提示:当前 `OpenIE.load()` 仍可能在内部要求你选择具体批次文件,若需完全无交互的导入,可后续扩展为显式指定文件路径。
### 3.4 刷新 LPMM 知识库:`refresh`
```bash
python scripts/lpmm_manager.py -a refresh
# 或
python scripts/lpmm_manager.py -a refresh --non-interactive
```
两者行为相同:
- 调用 `refresh_lpmm_knowledge.main()`,内部执行 `lpmm_start_up()`
- 日志中输出当前向量与 KG 规模,验证导入是否成功。
---
## 4. 典型流水线三:删除 / 回滚
删除操作通过 `lpmm_manager.py -a delete` 封装 `scripts/delete_lpmm_items.py`
### 4.1 交互式删除(推荐人工操作)
```bash
python scripts/lpmm_manager.py --interactive
```
菜单中选择:
1. `4. delete - 删除/回滚知识`
2. 再选择删除方式:
- 按哈希文件(`--hash-file`
- 按 OpenIE 批次(`--openie-file`
- 按原始语料 + 段落索引(`--raw-file + --raw-index`
- 按关键字搜索现有段落(`--search-text`
3. 管理脚本会根据你的选择自动拼好常用参数(是否删除实体/关系、是否删除孤立实体、是否 dry-run、是否自动确认等最后调用 `delete_lpmm_items.py` 执行。
### 4.2 非交互删除CI / 脚本场景)
#### 示例:按哈希文件删除(带完整保护参数)
```bash
python scripts/lpmm_manager.py -a delete --non-interactive -- \
--hash-file data/lpmm_delete_hashes.txt \
--delete-entities \
--delete-relations \
--remove-orphan-entities \
--max-delete-nodes 2000 \
--yes
```
- `--non-interactive`manager禁止任何 `input()` 询问;
- 子脚本 `delete_lpmm_items.py` 中:
- `--hash-file`:指定待删段落哈希列表;
- `--delete-entities` / `--delete-relations` / `--remove-orphan-entities`:同步清理实体与关系;
- `--max-delete-nodes`:单次删除节点数上限,避免误删过大规模;
- `--yes`:跳过终极确认,适合已验证的自动流水线。
#### 按 OpenIE 批次删除(常用于批次回滚)
```bash
python scripts/lpmm_manager.py -a delete --non-interactive -- \
--openie-file data/openie/2025-01-01-12-00-openie.json \
--delete-entities \
--delete-relations \
--remove-orphan-entities \
--yes
```
### 4.3 非交互模式下的安全限制
`delete_lpmm_items.py` 中:
- 若使用 `--search-text`,需要用户通过输入序号选择要删条目;
- 在 `--non-interactive` 模式下,这一步会直接报错退出,提示改用 `--hash-file / --openie-file / --raw-file` 等纯参数方式。
- 若未指定 `--yes`
- 非交互模式下会报错退出,提示「非交互模式且未指定 --yes出于安全考虑删除操作已被拒绝」。
---
## 5. 典型流水线四:自检与状态检查
### 5.1 检查指定 OpenIE 批次状态:`batch_inspect`
```bash
python scripts/lpmm_manager.py -a batch_inspect -- --openie-file data/openie/xx.json
```
输出该批次在当前库中的:
- 段落向量数量 / KG 段落节点数量;
- 实体向量数量 / KG 实体节点数量;
- 关系向量数量;
- 少量仍存在的样例内容。
常用于:
- 导入后确认是否完全成功;
- 删除后确认是否完全回滚。
### 5.2 查看整库状态:`global_inspect`
```bash
python scripts/lpmm_manager.py -a global_inspect
```
输出:
- 段落 / 实体 / 关系向量条数;
- KG 节点/边总数,段落节点数、实体节点数;
- 实体计数表 `ent_appear_cnt` 的条目数;
- 少量剩余段落/实体样例,便于快速 sanity check。
---
## 6. 典型流水线五:检索效果回归测试
### 6.1 使用默认测试用例
```bash
python scripts/lpmm_manager.py -a test
```
- 调用 `test_lpmm_retrieval.py` 内置的 `DEFAULT_TEST_CASES`
- 对每条用例输出:
- 原始结果;
- 状态(`PASS` / `WARN` / `NO_HIT` / `ERROR`
- 期望关键字与命中关键字列表。
### 6.2 自定义测试问题与期望关键字
```bash
python scripts/lpmm_manager.py -a test -- --query "LPMM 是什么?" \
--expect-keyword 哈希列表 \
--expect-keyword 删除脚本
```
也可以直接调用子脚本:
```bash
python scripts/test_lpmm_retrieval.py \
--query "LPMM 是什么?" \
--expect-keyword 哈希列表 \
--expect-keyword 删除脚本
```
---
## 7. 推荐组合示例
### 7.1 导入 + 刷新 + 简单回归
```bash
# 1. 执行全量导入(支持非交互)
python scripts/lpmm_manager.py -a full_import --non-interactive
# 2. 使用内置用例做一次检索回归
python scripts/lpmm_manager.py -a test
```
### 7.2 批次回滚 + 自检
```bash
TARGET_BATCH=data/openie/2025-01-01-12-00-openie.json
# 1. 按批次删除(非交互)
python scripts/lpmm_manager.py -a delete --non-interactive -- \
--openie-file "$TARGET_BATCH" \
--delete-entities \
--delete-relations \
--remove-orphan-entities \
--yes
# 2. 检查该批次是否彻底删除
python scripts/lpmm_manager.py -a batch_inspect -- --openie-file "$TARGET_BATCH"
# 3. 查看全库状态
python scripts/lpmm_manager.py -a global_inspect
```
---
如需扩展更多流水线(例如「导入特定批次后自动跑自定义测试用例」),可以在 `scripts/lpmm_manager.py` 中新增对应的 `ACTION_INFO` 条目和 `run_action` 分支,或直接在 CI / shell 脚本中串联上述命令。该管理脚本已支持参数化与非交互调用,适合作为二次封装的基础入口。

View File

@ -89,6 +89,15 @@ def main():
default=2000,
help="单次最大允许删除的节点数量(段落+实体),超过则需要显式确认或调整该参数",
)
parser.add_argument(
"--non-interactive",
action="store_true",
help=(
"非交互模式:不再通过 input() 询问任何信息;"
"在该模式下,如果需要交互(例如 --search-text 未指定具体条目、未提供 --yes"
"会直接报错退出。"
),
)
args = parser.parse_args()
# 至少需要一种来源
@ -211,6 +220,12 @@ def main():
logger.info("找到以下候选段落(输入序号选择要删除的条目,可用逗号分隔,多选):")
for i, (key, text) in enumerate(candidates, start=1):
logger.info(f"{i}. {key} | {text[:80]}")
if args.non_interactive:
logger.error(
"当前处于非交互模式,无法通过输入序号选择要删除的候选段落;"
"如需脚本化删除,请改用 --hash-file / --openie-file / --raw-file 等方式。"
)
sys.exit(1)
choice = input("请输入要删除的序号列表(如 1,3或直接回车取消").strip()
if choice:
try:
@ -270,6 +285,12 @@ def main():
# 交互确认
if not args.yes:
if args.non_interactive:
logger.error(
"当前处于非交互模式且未指定 --yes出于安全考虑删除操作已被拒绝。\n"
"如确认需要在非交互模式下执行删除,请显式添加 --yes 参数。"
)
sys.exit(1)
confirm = input("确认删除上述数据?输入大写 YES 以继续,其他任意键取消: ").strip()
if confirm != "YES":
logger.info("用户取消删除操作")

View File

@ -4,10 +4,12 @@
# print("未找到quick_algo库无法使用quick_algo算法")
# print("请安装quick_algo库 - 在lib.quick_algo中执行命令python setup.py build_ext --inplace")
import argparse
import sys
import os
import asyncio
from time import sleep
from typing import Optional
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
@ -71,7 +73,12 @@ def hash_deduplicate(
return new_raw_paragraphs, new_triple_list_data
def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, kg_manager: KGManager) -> bool:
def handle_import_openie(
openie_data: OpenIE,
embed_manager: EmbeddingManager,
kg_manager: KGManager,
non_interactive: bool = False,
) -> bool:
# sourcery skip: extract-method
# 从OpenIE数据中提取段落原文与三元组列表
# 索引的段落原文
@ -124,8 +131,13 @@ def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, k
logger.info("所有数据均完整,没有发现缺失字段。")
return False
# 新增:提示用户是否删除非法文段继续导入
# 将print移到所有logger.error之后确保不会被冲掉
# 在非交互模式下,不再询问用户,而是直接报错终止
logger.info(f"\n检测到非法文段,共{len(missing_idxs)}条。")
if non_interactive:
logger.error(
"检测到非法文段且当前处于非交互模式,无法询问是否删除非法文段,导入终止。"
)
sys.exit(1)
logger.info("\n是否删除所有非法文段后继续导入?(y/n): ", end="")
user_choice = input().strip().lower()
if user_choice != "y":
@ -174,20 +186,25 @@ def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, k
return True
async def main_async(): # sourcery skip: dict-comprehension
async def main_async(non_interactive: bool = False) -> bool: # sourcery skip: dict-comprehension
# 新增确认提示
print("=== 重要操作确认 ===")
print("OpenIE导入时会大量发送请求可能会撞到请求速度上限请注意选用的模型")
print("同之前样例在本地模型下在70分钟内我们发送了约8万条请求在网络允许下速度会更快")
print("推荐使用硅基流动的Pro/BAAI/bge-m3")
print("每百万Token费用为0.7元")
print("知识导入时,会消耗大量系统资源,建议在较好配置电脑上运行")
print("同上样例导入时10700K几乎跑满14900HX占用80%峰值内存占用约3G")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
if non_interactive:
logger.warning(
"当前处于非交互模式,将跳过导入开销确认提示,直接开始执行 OpenIE 导入。"
)
else:
print("=== 重要操作确认 ===")
print("OpenIE导入时会大量发送请求可能会撞到请求速度上限请注意选用的模型")
print("同之前样例在本地模型下在70分钟内我们发送了约8万条请求在网络允许下速度会更快")
print("推荐使用硅基流动的Pro/BAAI/bge-m3")
print("每百万Token费用为0.7元")
print("知识导入时,会消耗大量系统资源,建议在较好配置电脑上运行")
print("同上样例导入时10700K几乎跑满14900HX占用80%峰值内存占用约3G")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
print("\n" + "=" * 40 + "\n")
ensure_openie_dir() # 确保OpenIE目录存在
logger.info("----开始导入openie数据----\n")
@ -235,14 +252,27 @@ async def main_async(): # sourcery skip: dict-comprehension
except Exception as e:
logger.error(f"导入OpenIE数据文件时发生错误{e}")
return False
if handle_import_openie(openie_data, embed_manager, kg_manager) is False:
if handle_import_openie(openie_data, embed_manager, kg_manager, non_interactive=non_interactive) is False:
logger.error("处理OpenIE数据时发生错误")
return False
return None
return True
def main():
"""主函数 - 设置新的事件循环并运行异步主函数"""
def main(argv: Optional[list[str]] = None) -> None:
"""主函数 - 解析参数并运行异步主流程。"""
parser = argparse.ArgumentParser(
description=(
"OpenIE 导入脚本:读取 data/openie 中的 OpenIE JSON 批次,"
"将其导入到 LPMM 的向量库与知识图中。"
)
)
parser.add_argument(
"--non-interactive",
action="store_true",
help="非交互模式:跳过导入确认提示以及非法文段删除询问,遇到非法文段时直接报错退出。",
)
args = parser.parse_args(argv)
# 检查是否有现有的事件循环
try:
loop = asyncio.get_running_loop()
@ -255,9 +285,10 @@ def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ok: bool = False
try:
# 在新的事件循环中运行异步主函数
loop.run_until_complete(main_async())
ok = loop.run_until_complete(main_async(non_interactive=args.non_interactive))
print(
"\n[NOTICE] OpenIE 导入脚本执行完毕。如主程序(聊天 / WebUI已在运行"
"请重启主程序,或在主程序内部调用一次 lpmm_start_up() 以应用最新 LPMM 知识库。"
@ -267,6 +298,9 @@ def main():
# 确保事件循环被正确关闭
if not loop.is_closed():
loop.close()
if not ok:
# 统一错误码,方便在非交互场景下检测失败
sys.exit(1)
if __name__ == "__main__":

View File

@ -1,3 +1,4 @@
import argparse
import json
import os
import signal
@ -5,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, Event
import sys
import datetime
from typing import Optional
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# 添加项目根目录到 sys.path
@ -115,22 +117,27 @@ def signal_handler(_signum, _frame):
sys.exit(0)
def main(): # sourcery skip: comprehension-to-generator, extract-method
def _run(non_interactive: bool = False) -> None: # sourcery skip: comprehension-to-generator, extract-method
# 设置信号处理器
signal.signal(signal.SIGINT, signal_handler)
ensure_dirs() # 确保目录存在
# 新增用户确认提示
print("=== 重要操作确认,请认真阅读以下内容哦 ===")
print("实体提取操作将会花费较多api余额和时间建议在空闲时段执行。")
print("举例600万字全剧情提取选用deepseek v3 0324消耗约40元约3小时。")
print("建议使用硅基流动的非Pro模型")
print("或者使用可以用赠金抵扣的Pro模型")
print("请确保账户余额充足,并且在执行前确认无误。")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
if non_interactive:
logger.warning(
"当前处于非交互模式,将跳过费用与时长确认提示,直接开始进行实体提取操作。"
)
else:
print("=== 重要操作确认,请认真阅读以下内容哦 ===")
print("实体提取操作将会花费较多api余额和时间建议在空闲时段执行。")
print("举例600万字全剧情提取选用deepseek v3 0324消耗约40元约3小时。")
print("建议使用硅基流动的非Pro模型")
print("或者使用可以用赠金抵扣的Pro模型")
print("请确保账户余额充足,并且在执行前确认无误。")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
# 友好提示:说明“网络错误(可重试)”日志属于正常自动重试行为,避免用户误以为任务失败
print(
@ -222,5 +229,22 @@ def main(): # sourcery skip: comprehension-to-generator, extract-method
logger.info(f"提取失败的文段SHA256{failed_sha256}")
def main(argv: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(
description=(
"LPMM 信息提取脚本:从 data/lpmm_raw_data/*.txt 中读取原始段落,"
"调用 LLM 提取实体和三元组,并生成 OpenIE JSON 批次文件。"
)
)
parser.add_argument(
"--non-interactive",
action="store_true",
help="非交互模式:跳过费用确认提示,直接开始执行;适用于 CI / 定时任务等场景。",
)
args = parser.parse_args(argv)
_run(non_interactive=args.non_interactive)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,456 @@
import argparse
import os
import sys
from pathlib import Path
from typing import Optional, List
# 尽量统一控制台编码为 utf-8避免中文输出报错
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能导入 src.* 以及同目录脚本
CURRENT_DIR = os.path.dirname(__file__)
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, ".."))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from src.common.logger import get_logger # type: ignore
from src.config.config import global_config # type: ignore
# 引入各功能脚本的入口函数
from import_openie import main as import_openie_main # type: ignore
from info_extraction import main as info_extraction_main # type: ignore
from delete_lpmm_items import main as delete_lpmm_items_main # type: ignore
from inspect_lpmm_batch import main as inspect_lpmm_batch_main # type: ignore
from inspect_lpmm_global import main as inspect_lpmm_global_main # type: ignore
from refresh_lpmm_knowledge import main as refresh_lpmm_knowledge_main # type: ignore
from test_lpmm_retrieval import main as test_lpmm_retrieval_main # type: ignore
from raw_data_preprocessor import load_raw_data # type: ignore
logger = get_logger("lpmm_manager")
ACTION_INFO = {
"prepare_raw": "预处理 data/lpmm_raw_data/*.txt按空行切分为段落并做去重统计",
"info_extract": "原始 txt -> OpenIE 信息抽取(调用 info_extraction.py",
"import_openie": "导入 OpenIE 批次到向量库与知识图(调用 import_openie.py",
"delete": "删除/回滚知识(调用 delete_lpmm_items.py",
"batch_inspect": "检查指定 OpenIE 批次在当前库中的存在情况(调用 inspect_lpmm_batch.py",
"global_inspect": "查看当前整库向量与 KG 状态(调用 inspect_lpmm_global.py",
"refresh": "刷新 LPMM 磁盘数据到内存(调用 refresh_lpmm_knowledge.py",
"test": "运行 LPMM 检索效果回归测试(调用 test_lpmm_retrieval.py",
"full_import": "一键执行:信息抽取 -> 导入 OpenIE -> 刷新",
}
def _with_overridden_argv(extra_args: List[str], target_main) -> None:
"""在不修改子脚本的前提下,临时覆盖 sys.argv 以透传参数。"""
old_argv = list(sys.argv)
try:
# 第 0 个元素为“程序名”,后续元素为实际参数
# 这里不再插入类似 delete_lpmm_items.py 的占位,避免被 argparse 误识别为位置参数
sys.argv = [old_argv[0]] + extra_args
target_main()
finally:
sys.argv = old_argv
def _check_before_info_extract(non_interactive: bool = False) -> bool:
"""信息抽取前的轻量级检查。"""
raw_dir = Path(PROJECT_ROOT) / "data" / "lpmm_raw_data"
txt_files = list(raw_dir.glob("*.txt"))
if not txt_files:
msg = (
f"[WARN] 未在 {raw_dir} 下找到任何 .txt 原始语料文件,"
"info_extraction 可能立即退出或无数据可处理。"
)
print(msg)
if non_interactive:
logger.error(
"非交互模式下要求原始语料目录中已存在可用的 .txt 文件,请先准备好数据再重试。"
)
return False
cont = input("仍然继续执行信息提取吗?(y/n): ").strip().lower()
return cont == "y"
return True
def _check_before_import_openie(non_interactive: bool = False) -> bool:
"""导入 OpenIE 前的轻量级检查。"""
openie_dir = Path(PROJECT_ROOT) / "data" / "openie"
json_files = list(openie_dir.glob("*.json"))
if not json_files:
msg = (
f"[WARN] 未在 {openie_dir} 下找到任何 OpenIE JSON 文件,"
"import_openie 可能会因为找不到批次而失败。"
)
print(msg)
if non_interactive:
logger.error(
"非交互模式下要求 data/openie 目录中已存在可用的 OpenIE JSON 文件,请先执行信息提取脚本。"
)
return False
cont = input("仍然继续执行导入吗?(y/n): ").strip().lower()
return cont == "y"
return True
def _warn_if_lpmm_disabled() -> None:
"""在部分操作前提醒 lpmm_knowledge.enable 状态。"""
try:
if not getattr(global_config.lpmm_knowledge, "enable", False):
print(
"[WARN] 当前配置 lpmm_knowledge.enable = false"
"刷新或检索测试可能无法在聊天侧真正启用 LPMM。"
)
except Exception:
# 配置异常时不阻断主流程,仅忽略提示
pass
def run_action(action: str, extra_args: Optional[List[str]] = None) -> None:
"""根据动作名称调度到对应脚本。
这里不重复解析子参数而是直接调用各脚本的 main()
让子脚本保留原有的交互/参数行为
"""
logger.info("开始执行操作: %s", action)
extra_args = extra_args or []
try:
if action == "prepare_raw":
logger.info("开始预处理原始语料 (data/lpmm_raw_data/*.txt)...")
sha_list, raw_data = load_raw_data()
print(
f"\n[PREPARE_RAW] 完成原始语料预处理:共 {len(raw_data)} 条段落,"
f"去重后哈希数 {len(sha_list)}"
)
elif action == "info_extract":
if not _check_before_info_extract("--non-interactive" in extra_args):
print("已根据用户选择,取消执行信息提取。")
return
_with_overridden_argv(extra_args, info_extraction_main)
elif action == "import_openie":
if not _check_before_import_openie("--non-interactive" in extra_args):
print("已根据用户选择,取消执行导入。")
return
_with_overridden_argv(extra_args, import_openie_main)
elif action == "delete":
_with_overridden_argv(extra_args, delete_lpmm_items_main)
elif action == "batch_inspect":
_with_overridden_argv(extra_args, inspect_lpmm_batch_main)
elif action == "global_inspect":
_with_overridden_argv(extra_args, inspect_lpmm_global_main)
elif action == "refresh":
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, refresh_lpmm_knowledge_main)
elif action == "test":
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, test_lpmm_retrieval_main)
elif action == "full_import":
# 一键流水线:预处理原始语料 -> 信息抽取 -> 导入 -> 刷新
logger.info("开始 full_import预处理原始语料 -> 信息抽取 -> 导入 -> 刷新")
sha_list, raw_data = load_raw_data()
print(
f"\n[FULL_IMPORT] 原始语料预处理完成:共 {len(raw_data)} 条段落,"
f"去重后哈希数 {len(sha_list)}"
)
non_interactive = "--non-interactive" in extra_args
if not _check_before_info_extract(non_interactive):
print("已根据用户选择,取消 full_import信息提取阶段被取消")
return
# 使用与单步 info_extract 相同的参数透传机制,确保 --non-interactive 等生效
_with_overridden_argv(extra_args, info_extraction_main)
if not _check_before_import_openie(non_interactive):
print("已根据用户选择,取消 full_import导入阶段被取消")
return
_with_overridden_argv(extra_args, import_openie_main)
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, refresh_lpmm_knowledge_main)
else:
logger.error("未知操作: %s", action)
except KeyboardInterrupt:
logger.info("用户中断当前操作Ctrl+C")
except SystemExit:
# 子脚本里大量使用 sys.exit直接透传即可
raise
except Exception as exc: # pragma: no cover - 防御性兜底
logger.error("执行操作 %s 时发生未捕获异常: %s", action, exc)
raise
def print_menu() -> None:
print("\n===== LPMM 管理菜单 =====")
for idx, key in enumerate(
[
"prepare_raw",
"info_extract",
"import_openie",
"delete",
"batch_inspect",
"global_inspect",
"refresh",
"test",
"full_import",
],
start=1,
):
desc = ACTION_INFO.get(key, "")
print(f"{idx}. {key:14s} - {desc}")
print("0. 退出")
print("=========================")
def interactive_loop() -> None:
"""交互式选择模式。"""
key_order = [
"prepare_raw",
"info_extract",
"import_openie",
"delete",
"batch_inspect",
"global_inspect",
"refresh",
"test",
"full_import",
]
while True:
print_menu()
choice = input("请输入选项编号0-8").strip()
if choice in ("0", "q", "Q", "quit", "exit"):
print("已退出 LPMM 管理器。")
return
try:
idx = int(choice)
except ValueError:
print("输入无效,请输入 0-7 之间的数字。")
continue
if not (1 <= idx <= len(key_order)):
print("输入编号超出范围,请重新输入。")
continue
action = key_order[idx - 1]
print(f"\n你选择了: {action} - {ACTION_INFO.get(action, '')}")
confirm = input("确认执行该操作?(y/n): ").strip().lower()
if confirm != "y":
print("已取消当前操作。\n")
continue
# 通过交互式问题,尽量帮用户补全对应脚本的常用参数
extra_args: List[str] = []
if action == "delete":
extra_args = _interactive_build_delete_args()
elif action == "batch_inspect":
extra_args = _interactive_build_batch_inspect_args()
elif action == "test":
extra_args = _interactive_build_test_args()
else:
extra_args = []
run_action(action, extra_args=extra_args)
print("\n当前操作已结束,回到主菜单。\n")
def _interactive_choose_openie_file(prompt: str) -> Optional[str]:
"""在 data/openie 下列出可选 JSON 文件,并返回用户选择的路径。"""
openie_dir = Path(PROJECT_ROOT) / "data" / "openie"
files = sorted(openie_dir.glob("*.json"))
if not files:
print(f"[WARN] 在 {openie_dir} 下没有找到任何 OpenIE JSON 文件。")
return input(prompt).strip() or None
print("\n可选的 OpenIE 批次文件:")
for i, f in enumerate(files, start=1):
print(f"{i}. {f.name}")
print("0. 手动输入完整路径")
while True:
choice = input("请选择文件编号:").strip()
if choice == "0":
manual = input(prompt).strip()
return manual or None
try:
idx = int(choice)
except ValueError:
print("请输入合法的编号。")
continue
if 1 <= idx <= len(files):
return str(files[idx - 1])
print("编号超出范围,请重试。")
def _interactive_build_delete_args() -> List[str]:
"""为 delete_lpmm_items 构造常见参数,减少二次交互。"""
print(
"\n[DELETE] 请选择删除方式:\n"
"1. 按哈希文件删除 (--hash-file)\n"
"2. 按 OpenIE 批次删除 (--openie-file)\n"
"3. 按原始语料文件 + 段落索引删除 (--raw-file + --raw-index)\n"
"4. 按关键字搜索现有段落 (--search-text)\n"
"回车跳过,由子脚本自行交互。"
)
mode = input("输入选项编号1-4或回车跳过").strip()
args: List[str] = []
if mode == "1":
path = input("请输入哈希文件路径(每行一个 hash").strip()
if path:
args += ["--hash-file", path]
elif mode == "2":
path = _interactive_choose_openie_file("请输入 OpenIE JSON 文件路径:")
if path:
args += ["--openie-file", path]
elif mode == "3":
raw_file = input("请输入原始语料 txt 文件路径:").strip()
raw_index = input("请输入要删除的段落索引(如 1,3").strip()
if raw_file and raw_index:
args += ["--raw-file", raw_file, "--raw-index", raw_index]
elif mode == "4":
text = input("请输入用于搜索的关键字(出现在段落原文中):").strip()
if text:
args += ["--search-text", text]
else:
# 留空则完全交给子脚本交互
return []
# 进一步询问与安全相关的布尔选项
print(
"\n[DELETE] 接下来是一些安全相关选项的说明:\n"
"- 删除实体向量/节点:会一并清理与这些段落关联的实体节点及其向量;\n"
"- 删除关系向量:在上面的基础上,额外清理关系向量(一般与删除实体一同使用);\n"
"- 删除孤立实体节点:删除后若实体不再连接任何段落,将其从图中移除,避免残留孤点;\n"
"- dry-run只预览将要删除的内容不真正修改任何数据\n"
"- 跳过交互确认(--yes):直接执行删除操作,适合脚本化或已充分确认的场景;\n"
"- 单次最大删除节点数上限:防止一次性删除规模过大,起到误操作保护作用;\n"
"- 一般情况下建议同时删除实体向量/节点/关系向量/节点,以确保知识图谱的完整性。"
)
# 快速选项:按推荐方式清理所有相关实体/关系
quick_all = input(
"是否使用推荐策略:同时删除关联的实体向量/节点、关系向量,并清理孤立实体?(Y/n): "
).strip().lower()
if quick_all in ("", "y", "yes"):
args.extend(["--delete-entities", "--delete-relations", "--remove-orphan-entities"])
else:
# 仅当未使用快速方案时,再逐项询问
if input("是否同时删除实体向量/节点?(y/N): ").strip().lower() == "y":
args.append("--delete-entities")
if input("是否同时删除关系向量?(y/N): ").strip().lower() == "y":
args.append("--delete-relations")
if input("是否删除孤立实体节点?(y/N): ").strip().lower() == "y":
args.append("--remove-orphan-entities")
if input("是否以 dry-run 预览而不真正删除?(y/N): ").strip().lower() == "y":
args.append("--dry-run")
else:
if input("是否跳过交互确认直接删除?(默认否,请谨慎) (y/N): ").strip().lower() == "y":
args.append("--yes")
max_nodes = input("单次最大删除节点数上限(回车使用默认 2000").strip()
if max_nodes:
args += ["--max-delete-nodes", max_nodes]
return args
def _interactive_build_batch_inspect_args() -> List[str]:
"""为 inspect_lpmm_batch 构造 --openie-file 参数。"""
path = _interactive_choose_openie_file(
"请输入要检查的 OpenIE JSON 文件路径(回车跳过,由子脚本自行交互):"
)
if not path:
return []
return ["--openie-file", path]
def _interactive_build_test_args() -> List[str]:
"""为 test_lpmm_retrieval 构造自定义测试用例参数。"""
print(
"\n[TEST] 你可以:\n"
"- 直接回车使用内置的默认测试用例;\n"
"- 或者输入一条自定义问题,并指定期望命中的关键字。"
)
query = input("请输入自定义测试问题(回车则使用默认用例):").strip()
if not query:
return []
expect = input("请输入期望命中的关键字(可选,多项用逗号分隔):").strip()
args: List[str] = ["--query", query]
if expect:
for kw in expect.split(","):
kw = kw.strip()
if kw:
args.extend(["--expect-keyword", kw])
return args
def parse_args(argv: Optional[list[str]] = None) -> tuple[argparse.Namespace, List[str]]:
parser = argparse.ArgumentParser(
description=(
"LPMM 管理脚本:集中入口管理 LPMM 的导入 / 删除 / 自检 / 刷新 / 测试等功能。\n"
"可以通过 --interactive 进入菜单模式,也可以使用 --action 直接执行单个操作。"
)
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="进入交互式菜单模式(推荐给手动运维使用)",
)
parser.add_argument(
"-a",
"--action",
choices=list(ACTION_INFO.keys()),
help="直接执行指定操作(非交互模式)",
)
parser.add_argument(
"--non-interactive",
action="store_true",
help=(
"启用非交互模式lpmm_manager 自身不会再通过 input() 询问是否继续前置检查;"
"并会将 --non-interactive 透传给子脚本,以避免子脚本中的交互式确认。"
),
)
# 允许在管理脚本之后继续跟随子脚本参数,例如:
# python lpmm_manager.py -a delete -- --hash-file xxx --yes
args, unknown = parser.parse_known_args(argv)
return args, unknown
def main(argv: Optional[list[str]] = None) -> None:
args, extra_args = parse_args(argv)
# 如果指定了 non-interactive则不能进入交互式菜单
if args.non_interactive and args.interactive:
logger.error("不能同时指定 --interactive 与 --non-interactive请二选一。")
sys.exit(1)
# 没有指定 action 或显式要求交互 -> 进入菜单
if args.interactive or not args.action:
interactive_loop()
return
# 在非交互模式下,将 --non-interactive 透传给子脚本,避免其内部出现 input() 交互
if args.non_interactive:
extra_args = ["--non-interactive"] + extra_args
# 非交互模式:直接执行指定操作
run_action(args.action, extra_args=extra_args)
if __name__ == "__main__":
main()

View File

@ -1,7 +1,8 @@
import argparse
import asyncio
import os
import sys
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
# 强制使用 utf-8避免控制台编码报错影响 Embedding 加载
try:
@ -23,7 +24,7 @@ from src.memory_system.retrieval_tools.query_lpmm_knowledge import query_lpmm_kn
logger = get_logger("test_lpmm_retrieval")
TEST_CASES: List[Dict[str, Any]] = [
DEFAULT_TEST_CASES: List[Dict[str, Any]] = [
{
"name": "回滚一批知识",
"query": "LPMM是什么?",
@ -37,7 +38,7 @@ TEST_CASES: List[Dict[str, Any]] = [
]
async def run_tests() -> None:
async def run_tests(test_cases: Optional[List[Dict[str, Any]]] = None) -> None:
"""简单测试 LPMM 知识库检索能力"""
if not global_config.lpmm_knowledge.enable:
logger.warning("当前配置中 lpmm_knowledge.enable 为 False检索测试可能直接返回“未启用”。")
@ -46,7 +47,9 @@ async def run_tests() -> None:
lpmm_start_up()
logger.info("LPMM 知识库初始化完成,开始执行测试用例。")
for case in TEST_CASES:
cases = test_cases if test_cases is not None else DEFAULT_TEST_CASES
for case in cases:
name = case["name"]
query = case["query"]
expect_keywords: List[str] = case.get("expect_keywords", [])
@ -86,7 +89,33 @@ async def run_tests() -> None:
def main() -> None:
asyncio.run(run_tests())
parser = argparse.ArgumentParser(
description=(
"测试 LPMM 知识库检索能力。\n"
"如不提供参数,则执行内置的默认用例;\n"
"也可以通过 --query 与 --expect-keyword 自定义一条测试用例。"
)
)
parser.add_argument(
"--query",
help="自定义测试问题(单条)。提供该参数时,将仅运行这一条用例。",
)
parser.add_argument(
"--expect-keyword",
action="append",
help="期望在检索结果中出现的关键字,可重复多次指定;仅在提供 --query 时生效。",
)
args = parser.parse_args()
if args.query:
custom_case = {
"name": "custom",
"query": args.query,
"expect_keywords": args.expect_keyword or [],
}
asyncio.run(run_tests([custom_case]))
else:
asyncio.run(run_tests())
if __name__ == "__main__":

View File

@ -358,15 +358,14 @@ class KGManager:
paragraph_search_result: ParagraphEmbedding的搜索结果paragraph_hash, similarity
embed_manager: EmbeddingManager对象
"""
# 图中存在的节点总集
# 性能保护:超限或关闭时直接返回向量检索结果
# 性能保护:关闭或超限时直接返回向量检索结果(仅基于节点规模与开关)
if (
not global_config.lpmm_knowledge.enable_ppr
or len(self.graph.get_node_list()) > global_config.lpmm_knowledge.ppr_node_cap
or len(relation_search_result) > global_config.lpmm_knowledge.ppr_relation_cap
):
logger.info("PPR 已禁用或超出阈值,使用纯向量检索结果")
return paragraph_search_result, None
# 图中存在的节点总集
existed_nodes = self.graph.get_node_list()
# 准备PPR使用的数据

View File

@ -191,7 +191,6 @@ embedding_chunk_size = 4 # 每批嵌入的条数
max_synonym_entities = 2000 # 同义边参与的实体数上限,超限则跳过
enable_ppr = true # 是否启用PPR低配机器可关闭
ppr_node_cap = 8000 # 图节点数超过该值时跳过PPR
ppr_relation_cap = 50 # 命中关系数超过该值时跳过PPR
webui_graph_default_limit = 200 # WebUI /graph 默认返回的最大节点数,避免大图负载
[keyword_reaction]