172 lines
5.0 KiB
Python
172 lines
5.0 KiB
Python
import importlib.util
|
||
import json
|
||
import re
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from django.conf import settings
|
||
|
||
from .chroma_store import upsert_chunks
|
||
|
||
|
||
@dataclass
|
||
class IngestResult:
|
||
"""RAG 入库统一返回结构,供 Documents 模块稳定消费。"""
|
||
success: bool
|
||
chunks_count: int = 0
|
||
error: str = ""
|
||
|
||
|
||
def ingest_document(
|
||
scenario_id: str,
|
||
source_file: str,
|
||
text: str,
|
||
collection: str,
|
||
document_id: int | None = None,
|
||
store_path: str | Path | None = None,
|
||
) -> IngestResult:
|
||
"""
|
||
将单个文档文本切分后写入知识库。
|
||
|
||
运行策略:
|
||
- 如果显式传入 `store_path`,说明当前是测试或降级模式,走本地 JSON 存储。
|
||
- 如果未传入且环境可用 chromadb,则走真实 Chroma 持久化。
|
||
"""
|
||
if not text.strip():
|
||
return IngestResult(success=False, error="文档内容为空")
|
||
if _should_use_chroma(store_path):
|
||
return _ingest_chroma_document(
|
||
document_id=document_id,
|
||
scenario_id=scenario_id,
|
||
source_file=source_file,
|
||
text=text,
|
||
collection=collection,
|
||
)
|
||
resolved_store_path = Path(store_path) if store_path else _default_store_path()
|
||
chunks = _build_chunks(
|
||
scenario_id=scenario_id,
|
||
source_file=source_file,
|
||
text=text,
|
||
collection=collection,
|
||
document_id=document_id,
|
||
chunk_id_prefix=source_file,
|
||
)
|
||
persisted_chunks = _filter_out_same_document_chunks(
|
||
_load_store(resolved_store_path),
|
||
scenario_id=scenario_id,
|
||
collection=collection,
|
||
document_id=document_id,
|
||
)
|
||
_save_store(resolved_store_path, [*persisted_chunks, *chunks])
|
||
return IngestResult(success=True, chunks_count=len(chunks))
|
||
|
||
|
||
def _should_use_chroma(store_path: str | Path | None) -> bool:
|
||
"""只在未指定测试存储路径且安装 chromadb 时启用真实向量库。"""
|
||
return store_path is None and importlib.util.find_spec("chromadb") is not None
|
||
|
||
|
||
def _default_store_path() -> Path:
|
||
return Path(settings.CHROMA_PATH) / "rag_store.json"
|
||
|
||
|
||
def _load_store(store_path: Path) -> list[dict]:
|
||
if not store_path.exists():
|
||
return []
|
||
with store_path.open("r", encoding="utf-8") as file:
|
||
return json.load(file)
|
||
|
||
|
||
def _save_store(store_path: Path, chunks: list[dict]) -> None:
|
||
store_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with store_path.open("w", encoding="utf-8") as file:
|
||
json.dump(chunks, file, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _split_text(text: str, chunk_size: int = 800, overlap: int = 120) -> list[str]:
|
||
"""
|
||
使用固定窗口 + overlap 切分文本。
|
||
|
||
该策略简单但稳定,便于解释:
|
||
- chunk_size 控制每个片段最大长度
|
||
- overlap 保证相邻片段共享上下文,降低边界信息丢失
|
||
"""
|
||
normalized = re.sub(r"\s+", " ", text).strip()
|
||
if not normalized:
|
||
return []
|
||
chunks = []
|
||
start = 0
|
||
while start < len(normalized):
|
||
end = start + chunk_size
|
||
chunks.append(normalized[start:end])
|
||
if end >= len(normalized):
|
||
break
|
||
start = max(end - overlap, start + 1)
|
||
return chunks
|
||
|
||
|
||
def _build_chunks(
|
||
scenario_id: str,
|
||
source_file: str,
|
||
text: str,
|
||
collection: str,
|
||
document_id: int | None,
|
||
chunk_id_prefix: str,
|
||
) -> list[dict]:
|
||
"""把原始文本切分并封装为统一 chunk 结构。"""
|
||
created_at = datetime.now(timezone.utc).isoformat()
|
||
return [
|
||
{
|
||
"scenario_id": scenario_id,
|
||
"document_id": document_id,
|
||
"collection": collection,
|
||
"source": source_file,
|
||
"chunk_id": f"{scenario_id}:{chunk_id_prefix}:{index}",
|
||
"content": chunk_text,
|
||
"created_at": created_at,
|
||
}
|
||
for index, chunk_text in enumerate(_split_text(text), start=1)
|
||
]
|
||
|
||
|
||
def _filter_out_same_document_chunks(
|
||
chunks: list[dict],
|
||
scenario_id: str,
|
||
collection: str,
|
||
document_id: int | None,
|
||
) -> list[dict]:
|
||
"""重新入库同一 document_id 时,先删除旧 chunk,避免重复检索。"""
|
||
return [
|
||
chunk
|
||
for chunk in chunks
|
||
if not (
|
||
chunk.get("document_id") == document_id
|
||
and chunk.get("scenario_id") == scenario_id
|
||
and chunk.get("collection") == collection
|
||
)
|
||
]
|
||
|
||
|
||
def _ingest_chroma_document(
|
||
document_id: int | None,
|
||
scenario_id: str,
|
||
source_file: str,
|
||
text: str,
|
||
collection: str,
|
||
) -> IngestResult:
|
||
"""真实 Chroma 模式的入库分支。"""
|
||
chunks = _build_chunks(
|
||
scenario_id=scenario_id,
|
||
source_file=source_file,
|
||
text=text,
|
||
collection=collection,
|
||
document_id=document_id,
|
||
chunk_id_prefix=str(document_id or source_file),
|
||
)
|
||
try:
|
||
upsert_chunks(collection=collection, chunks=chunks)
|
||
except Exception as exc:
|
||
return IngestResult(success=False, error=str(exc))
|
||
return IngestResult(success=True, chunks_count=len(chunks))
|