import importlib.util import json import re from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from django.conf import settings from .chroma_store import upsert_chunks @dataclass class IngestResult: """RAG 入库统一返回结构,供 Documents 模块稳定消费。""" success: bool chunks_count: int = 0 error: str = "" def ingest_document( scenario_id: str, source_file: str, text: str, collection: str, document_id: int | None = None, store_path: str | Path | None = None, ) -> IngestResult: """ 将单个文档文本切分后写入知识库。 运行策略: - 如果显式传入 `store_path`,说明当前是测试或降级模式,走本地 JSON 存储。 - 如果未传入且环境可用 chromadb,则走真实 Chroma 持久化。 """ if not text.strip(): return IngestResult(success=False, error="文档内容为空") if _should_use_chroma(store_path): return _ingest_chroma_document( document_id=document_id, scenario_id=scenario_id, source_file=source_file, text=text, collection=collection, ) resolved_store_path = Path(store_path) if store_path else _default_store_path() chunks = _build_chunks( scenario_id=scenario_id, source_file=source_file, text=text, collection=collection, document_id=document_id, chunk_id_prefix=source_file, ) persisted_chunks = _filter_out_same_document_chunks( _load_store(resolved_store_path), scenario_id=scenario_id, collection=collection, document_id=document_id, ) _save_store(resolved_store_path, [*persisted_chunks, *chunks]) return IngestResult(success=True, chunks_count=len(chunks)) def _should_use_chroma(store_path: str | Path | None) -> bool: """只在未指定测试存储路径且安装 chromadb 时启用真实向量库。""" return store_path is None and importlib.util.find_spec("chromadb") is not None def _default_store_path() -> Path: return Path(settings.CHROMA_PATH) / "rag_store.json" def _load_store(store_path: Path) -> list[dict]: if not store_path.exists(): return [] with store_path.open("r", encoding="utf-8") as file: return json.load(file) def _save_store(store_path: Path, chunks: list[dict]) -> None: store_path.parent.mkdir(parents=True, exist_ok=True) with store_path.open("w", encoding="utf-8") as file: json.dump(chunks, file, ensure_ascii=False, indent=2) def _split_text(text: str, chunk_size: int = 800, overlap: int = 120) -> list[str]: """ 使用固定窗口 + overlap 切分文本。 该策略简单但稳定,便于解释: - chunk_size 控制每个片段最大长度 - overlap 保证相邻片段共享上下文,降低边界信息丢失 """ normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return [] chunks = [] start = 0 while start < len(normalized): end = start + chunk_size chunks.append(normalized[start:end]) if end >= len(normalized): break start = max(end - overlap, start + 1) return chunks def _build_chunks( scenario_id: str, source_file: str, text: str, collection: str, document_id: int | None, chunk_id_prefix: str, ) -> list[dict]: """把原始文本切分并封装为统一 chunk 结构。""" created_at = datetime.now(timezone.utc).isoformat() return [ { "scenario_id": scenario_id, "document_id": document_id, "collection": collection, "source": source_file, "chunk_id": f"{scenario_id}:{chunk_id_prefix}:{index}", "content": chunk_text, "created_at": created_at, } for index, chunk_text in enumerate(_split_text(text), start=1) ] def _filter_out_same_document_chunks( chunks: list[dict], scenario_id: str, collection: str, document_id: int | None, ) -> list[dict]: """重新入库同一 document_id 时,先删除旧 chunk,避免重复检索。""" return [ chunk for chunk in chunks if not ( chunk.get("document_id") == document_id and chunk.get("scenario_id") == scenario_id and chunk.get("collection") == collection ) ] def _ingest_chroma_document( document_id: int | None, scenario_id: str, source_file: str, text: str, collection: str, ) -> IngestResult: """真实 Chroma 模式的入库分支。""" chunks = _build_chunks( scenario_id=scenario_id, source_file=source_file, text=text, collection=collection, document_id=document_id, chunk_id_prefix=str(document_id or source_file), ) try: upsert_chunks(collection=collection, chunks=chunks) except Exception as exc: return IngestResult(success=False, error=str(exc)) return IngestResult(success=True, chunks_count=len(chunks))