import json import re import importlib.util from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from django.conf import settings from .chroma_store import upsert_chunks @dataclass class IngestResult: success: bool chunks_count: int = 0 error: str = "" def _default_store_path() -> Path: return Path(settings.CHROMA_PATH) / "rag_store.json" def _load_store(store_path: Path) -> list[dict]: if not store_path.exists(): return [] with store_path.open("r", encoding="utf-8") as file: return json.load(file) def _save_store(store_path: Path, chunks: list[dict]) -> None: store_path.parent.mkdir(parents=True, exist_ok=True) with store_path.open("w", encoding="utf-8") as file: json.dump(chunks, file, ensure_ascii=False, indent=2) def _split_text(text: str, chunk_size: int = 800, overlap: int = 120) -> list[str]: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return [] chunks = [] start = 0 while start < len(normalized): end = start + chunk_size chunks.append(normalized[start:end]) if end >= len(normalized): break start = max(end - overlap, start + 1) return chunks def ingest_document( scenario_id: str, source_file: str, text: str, collection: str, document_id: int | None = None, store_path: str | Path | None = None, ) -> IngestResult: if not text.strip(): return IngestResult(success=False, error="文档内容为空") if store_path is None and importlib.util.find_spec("chromadb") is not None: return _ingest_chroma_document(document_id, scenario_id, source_file, text, collection) resolved_store_path = Path(store_path) if store_path else _default_store_path() existing_chunks = [ chunk for chunk in _load_store(resolved_store_path) if not ( chunk.get("document_id") == document_id and chunk.get("scenario_id") == scenario_id and chunk.get("collection") == collection ) ] created_at = datetime.now(timezone.utc).isoformat() new_chunks = [] for index, chunk_text in enumerate(_split_text(text), start=1): new_chunks.append( { "scenario_id": scenario_id, "document_id": document_id, "collection": collection, "source": source_file, "chunk_id": f"{scenario_id}:{source_file}:{index}", "content": chunk_text, "created_at": created_at, } ) _save_store(resolved_store_path, [*existing_chunks, *new_chunks]) return IngestResult(success=True, chunks_count=len(new_chunks)) def _ingest_chroma_document( document_id: int | None, scenario_id: str, source_file: str, text: str, collection: str, ) -> IngestResult: created_at = datetime.now(timezone.utc).isoformat() chunks = [ { "scenario_id": scenario_id, "document_id": document_id, "collection": collection, "source": source_file, "chunk_id": f"{scenario_id}:{document_id or source_file}:{index}", "content": chunk_text, "created_at": created_at, } for index, chunk_text in enumerate(_split_text(text), start=1) ] try: upsert_chunks(collection=collection, chunks=chunks) except Exception as exc: return IngestResult(success=False, error=str(exc)) return IngestResult(success=True, chunks_count=len(chunks))