from __future__ import annotations import csv import logging from tempfile import TemporaryDirectory from dataclasses import asdict, dataclass, field from pathlib import Path from django.conf import settings from review_agent.models import FileAttachment from review_agent.file_summary.services.archive import ARCHIVE_EXTENSIONS, extract_archive TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "log"} SUPPORTED_EXTENSIONS = TEXT_EXTENSIONS | {"pdf", "docx", "xlsx", "pptx"} | ARCHIVE_EXTENSIONS MAX_PREVIEW_CHARS = 3000 MAX_ROWS_PER_SHEET = 20 logger = logging.getLogger("review_agent.file_summary.attachment_reader") @dataclass(frozen=True) class AttachmentReadResult: status: str filename: str file_type: str file_size: int preview_text: str = "" sections: list[dict[str, object]] = field(default_factory=list) error_message: str = "" def to_dict(self) -> dict[str, object]: return asdict(self) def read_attachment_details(attachment: FileAttachment) -> AttachmentReadResult: file_path = _attachment_absolute_path(attachment) file_type = Path(attachment.original_name).suffix.lower().lstrip(".") logger.info( "Attachment read started", extra={ "attachment_id": attachment.pk, "conversation_id": attachment.conversation_id, "original_name": attachment.original_name, "file_type": file_type, "storage_path": attachment.storage_path, "resolved_path": str(file_path), }, ) if not file_path.exists(): logger.warning( "Attachment read missing file", extra={"attachment_id": attachment.pk, "resolved_path": str(file_path)}, ) return _failed(attachment, file_type, "附件文件不存在。") if file_type not in SUPPORTED_EXTENSIONS: logger.warning( "Attachment read unsupported type", extra={"attachment_id": attachment.pk, "file_type": file_type}, ) return _failed(attachment, file_type, f"暂不支持解析 .{file_type or 'unknown'} 文件。", "unsupported") try: if file_type == "pdf": sections = _read_pdf(file_path) elif file_type == "docx": sections = _read_docx(file_path) elif file_type == "xlsx": sections = _read_xlsx(file_path) elif file_type == "pptx": sections = _read_pptx(file_path) elif file_type == "csv": sections = _read_csv(file_path) elif file_type in ARCHIVE_EXTENSIONS: sections = _read_archive(file_path) else: sections = _read_text(file_path) except Exception as exc: logger.exception( "Attachment read failed", extra={"attachment_id": attachment.pk, "file_type": file_type, "error": str(exc)}, ) return _failed(attachment, file_type, str(exc)) preview = _build_preview(sections) logger.info( "Attachment read finished", extra={ "attachment_id": attachment.pk, "section_count": len(sections), "preview_length": len(preview), }, ) return AttachmentReadResult( status="success", filename=attachment.original_name, file_type=file_type, file_size=attachment.file_size, preview_text=preview[:MAX_PREVIEW_CHARS], sections=sections, ) def _attachment_absolute_path(attachment: FileAttachment) -> Path: path = Path(attachment.storage_path) if path.is_absolute(): return path return Path(settings.MEDIA_ROOT) / path def _failed( attachment: FileAttachment, file_type: str, message: str, status: str = "failed", ) -> AttachmentReadResult: return AttachmentReadResult( status=status, filename=attachment.original_name, file_type=file_type, file_size=attachment.file_size, error_message=message, ) def _read_text(path: Path) -> list[dict[str, object]]: text = path.read_text(encoding="utf-8", errors="replace") return [{"type": "text", "name": path.name, "text": text[:MAX_PREVIEW_CHARS]}] def _read_csv(path: Path) -> list[dict[str, object]]: with path.open("r", encoding="utf-8-sig", errors="replace", newline="") as handle: rows = [[str(cell) for cell in row] for row in csv.reader(handle)] return [ { "type": "table", "name": path.name, "row_count": len(rows), "rows": rows[:MAX_ROWS_PER_SHEET], } ] def _read_pdf(path: Path) -> list[dict[str, object]]: from pypdf import PdfReader reader = PdfReader(str(path)) pages = [] for index, page in enumerate(reader.pages, start=1): text = page.extract_text() or "" pages.append({"type": "page", "name": f"第 {index} 页", "text": text}) return pages def _read_docx(path: Path) -> list[dict[str, object]]: from docx import Document document = Document(str(path)) paragraphs = [item.text.strip() for item in document.paragraphs if item.text.strip()] sections: list[dict[str, object]] = [ {"type": "text", "name": "正文", "text": "\n".join(paragraphs)} ] for index, table in enumerate(document.tables, start=1): rows = [[cell.text.strip() for cell in row.cells] for row in table.rows] sections.append( { "type": "table", "name": f"表格 {index}", "row_count": len(rows), "rows": rows[:MAX_ROWS_PER_SHEET], } ) return sections def _read_xlsx(path: Path) -> list[dict[str, object]]: from openpyxl import load_workbook workbook = load_workbook(str(path), read_only=True, data_only=True) sections = [] for sheet in workbook.worksheets: rows = [] for row in sheet.iter_rows(max_row=MAX_ROWS_PER_SHEET, values_only=True): rows.append(["" if cell is None else str(cell) for cell in row]) sections.append( { "type": "sheet", "name": sheet.title, "row_count": sheet.max_row, "column_count": sheet.max_column, "rows": rows, } ) workbook.close() return sections def _read_pptx(path: Path) -> list[dict[str, object]]: from pptx import Presentation presentation = Presentation(str(path)) sections = [] for index, slide in enumerate(presentation.slides, start=1): texts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text.strip(): texts.append(shape.text.strip()) sections.append({"type": "slide", "name": f"幻灯片 {index}", "text": "\n".join(texts)}) return sections def _read_archive(path: Path) -> list[dict[str, object]]: sections: list[dict[str, object]] = [] with TemporaryDirectory(prefix="attachment-reader-") as temp_dir: extracted = extract_archive(path, Path(temp_dir)) if not extracted: return [{"type": "archive", "name": path.name, "text": "压缩包未解出任何可读取文件。"}] for item in extracted: file_type = item.suffix.lower().lstrip(".") if file_type not in SUPPORTED_EXTENSIONS or file_type in ARCHIVE_EXTENSIONS: sections.append( { "type": "file", "name": item.name, "text": f"暂不支持预览压缩包内的 .{file_type or 'unknown'} 文件。", } ) continue for section in _read_supported_file(item, file_type): section = dict(section) section["name"] = f"{item.name} / {section.get('name', item.name)}" sections.append(section) return sections def _read_supported_file(path: Path, file_type: str) -> list[dict[str, object]]: if file_type == "pdf": return _read_pdf(path) if file_type == "docx": return _read_docx(path) if file_type == "xlsx": return _read_xlsx(path) if file_type == "pptx": return _read_pptx(path) if file_type == "csv": return _read_csv(path) return _read_text(path) def _build_preview(sections: list[dict[str, object]]) -> str: parts: list[str] = [] for section in sections: if "text" in section and section["text"]: parts.append(str(section["text"])) rows = section.get("rows") if rows: parts.extend(" | ".join(str(cell) for cell in row) for row in rows[:5]) return "\n".join(part for part in parts if part).strip()