90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
from pathlib import Path
|
|
from zipfile import BadZipFile, ZipFile
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from agent_core.rag.ingest import ingest_document
|
|
|
|
from .models import UploadedDocument
|
|
|
|
|
|
def create_uploaded_document(scenario_id: str, uploaded_file) -> UploadedDocument:
|
|
extension = Path(uploaded_file.name).suffix.lower().lstrip(".")
|
|
return UploadedDocument.objects.create(
|
|
scenario_id=scenario_id,
|
|
original_name=uploaded_file.name,
|
|
file=uploaded_file,
|
|
file_type=extension,
|
|
size=uploaded_file.size,
|
|
status=UploadedDocument.STATUS_UPLOADED,
|
|
)
|
|
|
|
|
|
def extract_text(document: UploadedDocument) -> str:
|
|
path = Path(document.file.path)
|
|
extension = f".{document.file_type.lower().lstrip('.')}"
|
|
if extension == ".pdf":
|
|
return _extract_pdf_text(path)
|
|
if extension == ".docx":
|
|
return _extract_docx_text(path)
|
|
return _read_text_file(path)
|
|
|
|
|
|
def _read_text_file(path: Path) -> str:
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
return path.read_text()
|
|
|
|
|
|
def _extract_pdf_text(path: Path) -> str:
|
|
try:
|
|
import pypdf
|
|
|
|
reader = pypdf.PdfReader(str(path))
|
|
return "\n".join(page.extract_text() or "" for page in reader.pages)
|
|
except Exception:
|
|
return _read_binary_text_fallback(path)
|
|
|
|
|
|
def _extract_docx_text(path: Path) -> str:
|
|
try:
|
|
with ZipFile(path) as archive:
|
|
document_xml = archive.read("word/document.xml")
|
|
root = ET.fromstring(document_xml)
|
|
namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
|
|
texts = [node.text for node in root.findall(".//w:t", namespace) if node.text]
|
|
return "\n".join(texts)
|
|
except (BadZipFile, KeyError, ET.ParseError):
|
|
return _read_binary_text_fallback(path)
|
|
|
|
|
|
def _read_binary_text_fallback(path: Path) -> str:
|
|
data = path.read_bytes()
|
|
text = data.decode("utf-8", errors="ignore")
|
|
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]+", " ", text)
|
|
return text.strip()
|
|
|
|
|
|
def index_document(document: UploadedDocument) -> UploadedDocument:
|
|
try:
|
|
text = extract_text(document)
|
|
result = ingest_document(
|
|
document_id=document.id,
|
|
scenario_id=document.scenario_id,
|
|
source_file=document.original_name,
|
|
text=text,
|
|
collection=document.scenario_id,
|
|
)
|
|
if result.success:
|
|
document.status = UploadedDocument.STATUS_INDEXED
|
|
document.error_message = ""
|
|
else:
|
|
document.status = UploadedDocument.STATUS_FAILED
|
|
document.error_message = result.error
|
|
except Exception as exc:
|
|
document.status = UploadedDocument.STATUS_FAILED
|
|
document.error_message = str(exc)
|
|
document.save(update_fields=["status", "error_message", "updated_at"])
|
|
return document
|