Files
DEMO-AGENT/review_agent/file_summary/services/attachment_reader.py

220 lines
6.9 KiB
Python

from __future__ import annotations
import csv
import logging
from dataclasses import asdict, dataclass, field
from pathlib import Path
from django.conf import settings
from review_agent.models import FileAttachment
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "log"}
SUPPORTED_EXTENSIONS = TEXT_EXTENSIONS | {"pdf", "docx", "xlsx", "pptx"}
MAX_PREVIEW_CHARS = 3000
MAX_ROWS_PER_SHEET = 20
logger = logging.getLogger("review_agent.file_summary.attachment_reader")
@dataclass(frozen=True)
class AttachmentReadResult:
status: str
filename: str
file_type: str
file_size: int
preview_text: str = ""
sections: list[dict[str, object]] = field(default_factory=list)
error_message: str = ""
def to_dict(self) -> dict[str, object]:
return asdict(self)
def read_attachment_details(attachment: FileAttachment) -> AttachmentReadResult:
file_path = _attachment_absolute_path(attachment)
file_type = Path(attachment.original_name).suffix.lower().lstrip(".")
logger.info(
"Attachment read started",
extra={
"attachment_id": attachment.pk,
"conversation_id": attachment.conversation_id,
"original_name": attachment.original_name,
"file_type": file_type,
"storage_path": attachment.storage_path,
"resolved_path": str(file_path),
},
)
if not file_path.exists():
logger.warning(
"Attachment read missing file",
extra={"attachment_id": attachment.pk, "resolved_path": str(file_path)},
)
return _failed(attachment, file_type, "附件文件不存在。")
if file_type not in SUPPORTED_EXTENSIONS:
logger.warning(
"Attachment read unsupported type",
extra={"attachment_id": attachment.pk, "file_type": file_type},
)
return _failed(attachment, file_type, f"暂不支持解析 .{file_type or 'unknown'} 文件。", "unsupported")
try:
if file_type == "pdf":
sections = _read_pdf(file_path)
elif file_type == "docx":
sections = _read_docx(file_path)
elif file_type == "xlsx":
sections = _read_xlsx(file_path)
elif file_type == "pptx":
sections = _read_pptx(file_path)
elif file_type == "csv":
sections = _read_csv(file_path)
else:
sections = _read_text(file_path)
except Exception as exc:
logger.exception(
"Attachment read failed",
extra={"attachment_id": attachment.pk, "file_type": file_type, "error": str(exc)},
)
return _failed(attachment, file_type, str(exc))
preview = _build_preview(sections)
logger.info(
"Attachment read finished",
extra={
"attachment_id": attachment.pk,
"section_count": len(sections),
"preview_length": len(preview),
},
)
return AttachmentReadResult(
status="success",
filename=attachment.original_name,
file_type=file_type,
file_size=attachment.file_size,
preview_text=preview[:MAX_PREVIEW_CHARS],
sections=sections,
)
def _attachment_absolute_path(attachment: FileAttachment) -> Path:
path = Path(attachment.storage_path)
if path.is_absolute():
return path
return Path(settings.MEDIA_ROOT) / path
def _failed(
attachment: FileAttachment,
file_type: str,
message: str,
status: str = "failed",
) -> AttachmentReadResult:
return AttachmentReadResult(
status=status,
filename=attachment.original_name,
file_type=file_type,
file_size=attachment.file_size,
error_message=message,
)
def _read_text(path: Path) -> list[dict[str, object]]:
text = path.read_text(encoding="utf-8", errors="replace")
return [{"type": "text", "name": path.name, "text": text[:MAX_PREVIEW_CHARS]}]
def _read_csv(path: Path) -> list[dict[str, object]]:
with path.open("r", encoding="utf-8-sig", errors="replace", newline="") as handle:
rows = [[str(cell) for cell in row] for row in csv.reader(handle)]
return [
{
"type": "table",
"name": path.name,
"row_count": len(rows),
"rows": rows[:MAX_ROWS_PER_SHEET],
}
]
def _read_pdf(path: Path) -> list[dict[str, object]]:
from pypdf import PdfReader
reader = PdfReader(str(path))
pages = []
for index, page in enumerate(reader.pages, start=1):
text = page.extract_text() or ""
pages.append({"type": "page", "name": f"{index}", "text": text})
return pages
def _read_docx(path: Path) -> list[dict[str, object]]:
from docx import Document
document = Document(str(path))
paragraphs = [item.text.strip() for item in document.paragraphs if item.text.strip()]
sections: list[dict[str, object]] = [
{"type": "text", "name": "正文", "text": "\n".join(paragraphs)}
]
for index, table in enumerate(document.tables, start=1):
rows = [[cell.text.strip() for cell in row.cells] for row in table.rows]
sections.append(
{
"type": "table",
"name": f"表格 {index}",
"row_count": len(rows),
"rows": rows[:MAX_ROWS_PER_SHEET],
}
)
return sections
def _read_xlsx(path: Path) -> list[dict[str, object]]:
from openpyxl import load_workbook
workbook = load_workbook(str(path), read_only=True, data_only=True)
sections = []
for sheet in workbook.worksheets:
rows = []
for row in sheet.iter_rows(max_row=MAX_ROWS_PER_SHEET, values_only=True):
rows.append(["" if cell is None else str(cell) for cell in row])
sections.append(
{
"type": "sheet",
"name": sheet.title,
"row_count": sheet.max_row,
"column_count": sheet.max_column,
"rows": rows,
}
)
workbook.close()
return sections
def _read_pptx(path: Path) -> list[dict[str, object]]:
from pptx import Presentation
presentation = Presentation(str(path))
sections = []
for index, slide in enumerate(presentation.slides, start=1):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
texts.append(shape.text.strip())
sections.append({"type": "slide", "name": f"幻灯片 {index}", "text": "\n".join(texts)})
return sections
def _build_preview(sections: list[dict[str, object]]) -> str:
parts: list[str] = []
for section in sections:
if "text" in section and section["text"]:
parts.append(str(section["text"]))
rows = section.get("rows")
if rows:
parts.extend(" | ".join(str(cell) for cell in row) for row in rows[:5])
return "\n".join(part for part in parts if part).strip()