220 lines
6.9 KiB
Python
220 lines
6.9 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import logging
|
|
from dataclasses import asdict, dataclass, field
|
|
from pathlib import Path
|
|
|
|
from django.conf import settings
|
|
|
|
from review_agent.models import FileAttachment
|
|
|
|
|
|
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "log"}
|
|
SUPPORTED_EXTENSIONS = TEXT_EXTENSIONS | {"pdf", "docx", "xlsx", "pptx"}
|
|
MAX_PREVIEW_CHARS = 3000
|
|
MAX_ROWS_PER_SHEET = 20
|
|
|
|
|
|
logger = logging.getLogger("review_agent.file_summary.attachment_reader")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AttachmentReadResult:
|
|
status: str
|
|
filename: str
|
|
file_type: str
|
|
file_size: int
|
|
preview_text: str = ""
|
|
sections: list[dict[str, object]] = field(default_factory=list)
|
|
error_message: str = ""
|
|
|
|
def to_dict(self) -> dict[str, object]:
|
|
return asdict(self)
|
|
|
|
|
|
def read_attachment_details(attachment: FileAttachment) -> AttachmentReadResult:
|
|
file_path = _attachment_absolute_path(attachment)
|
|
file_type = Path(attachment.original_name).suffix.lower().lstrip(".")
|
|
logger.info(
|
|
"Attachment read started",
|
|
extra={
|
|
"attachment_id": attachment.pk,
|
|
"conversation_id": attachment.conversation_id,
|
|
"original_name": attachment.original_name,
|
|
"file_type": file_type,
|
|
"storage_path": attachment.storage_path,
|
|
"resolved_path": str(file_path),
|
|
},
|
|
)
|
|
|
|
if not file_path.exists():
|
|
logger.warning(
|
|
"Attachment read missing file",
|
|
extra={"attachment_id": attachment.pk, "resolved_path": str(file_path)},
|
|
)
|
|
return _failed(attachment, file_type, "附件文件不存在。")
|
|
if file_type not in SUPPORTED_EXTENSIONS:
|
|
logger.warning(
|
|
"Attachment read unsupported type",
|
|
extra={"attachment_id": attachment.pk, "file_type": file_type},
|
|
)
|
|
return _failed(attachment, file_type, f"暂不支持解析 .{file_type or 'unknown'} 文件。", "unsupported")
|
|
|
|
try:
|
|
if file_type == "pdf":
|
|
sections = _read_pdf(file_path)
|
|
elif file_type == "docx":
|
|
sections = _read_docx(file_path)
|
|
elif file_type == "xlsx":
|
|
sections = _read_xlsx(file_path)
|
|
elif file_type == "pptx":
|
|
sections = _read_pptx(file_path)
|
|
elif file_type == "csv":
|
|
sections = _read_csv(file_path)
|
|
else:
|
|
sections = _read_text(file_path)
|
|
except Exception as exc:
|
|
logger.exception(
|
|
"Attachment read failed",
|
|
extra={"attachment_id": attachment.pk, "file_type": file_type, "error": str(exc)},
|
|
)
|
|
return _failed(attachment, file_type, str(exc))
|
|
|
|
preview = _build_preview(sections)
|
|
logger.info(
|
|
"Attachment read finished",
|
|
extra={
|
|
"attachment_id": attachment.pk,
|
|
"section_count": len(sections),
|
|
"preview_length": len(preview),
|
|
},
|
|
)
|
|
return AttachmentReadResult(
|
|
status="success",
|
|
filename=attachment.original_name,
|
|
file_type=file_type,
|
|
file_size=attachment.file_size,
|
|
preview_text=preview[:MAX_PREVIEW_CHARS],
|
|
sections=sections,
|
|
)
|
|
|
|
|
|
def _attachment_absolute_path(attachment: FileAttachment) -> Path:
|
|
path = Path(attachment.storage_path)
|
|
if path.is_absolute():
|
|
return path
|
|
return Path(settings.MEDIA_ROOT) / path
|
|
|
|
|
|
def _failed(
|
|
attachment: FileAttachment,
|
|
file_type: str,
|
|
message: str,
|
|
status: str = "failed",
|
|
) -> AttachmentReadResult:
|
|
return AttachmentReadResult(
|
|
status=status,
|
|
filename=attachment.original_name,
|
|
file_type=file_type,
|
|
file_size=attachment.file_size,
|
|
error_message=message,
|
|
)
|
|
|
|
|
|
def _read_text(path: Path) -> list[dict[str, object]]:
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
return [{"type": "text", "name": path.name, "text": text[:MAX_PREVIEW_CHARS]}]
|
|
|
|
|
|
def _read_csv(path: Path) -> list[dict[str, object]]:
|
|
with path.open("r", encoding="utf-8-sig", errors="replace", newline="") as handle:
|
|
rows = [[str(cell) for cell in row] for row in csv.reader(handle)]
|
|
return [
|
|
{
|
|
"type": "table",
|
|
"name": path.name,
|
|
"row_count": len(rows),
|
|
"rows": rows[:MAX_ROWS_PER_SHEET],
|
|
}
|
|
]
|
|
|
|
|
|
def _read_pdf(path: Path) -> list[dict[str, object]]:
|
|
from pypdf import PdfReader
|
|
|
|
reader = PdfReader(str(path))
|
|
pages = []
|
|
for index, page in enumerate(reader.pages, start=1):
|
|
text = page.extract_text() or ""
|
|
pages.append({"type": "page", "name": f"第 {index} 页", "text": text})
|
|
return pages
|
|
|
|
|
|
def _read_docx(path: Path) -> list[dict[str, object]]:
|
|
from docx import Document
|
|
|
|
document = Document(str(path))
|
|
paragraphs = [item.text.strip() for item in document.paragraphs if item.text.strip()]
|
|
sections: list[dict[str, object]] = [
|
|
{"type": "text", "name": "正文", "text": "\n".join(paragraphs)}
|
|
]
|
|
for index, table in enumerate(document.tables, start=1):
|
|
rows = [[cell.text.strip() for cell in row.cells] for row in table.rows]
|
|
sections.append(
|
|
{
|
|
"type": "table",
|
|
"name": f"表格 {index}",
|
|
"row_count": len(rows),
|
|
"rows": rows[:MAX_ROWS_PER_SHEET],
|
|
}
|
|
)
|
|
return sections
|
|
|
|
|
|
def _read_xlsx(path: Path) -> list[dict[str, object]]:
|
|
from openpyxl import load_workbook
|
|
|
|
workbook = load_workbook(str(path), read_only=True, data_only=True)
|
|
sections = []
|
|
for sheet in workbook.worksheets:
|
|
rows = []
|
|
for row in sheet.iter_rows(max_row=MAX_ROWS_PER_SHEET, values_only=True):
|
|
rows.append(["" if cell is None else str(cell) for cell in row])
|
|
sections.append(
|
|
{
|
|
"type": "sheet",
|
|
"name": sheet.title,
|
|
"row_count": sheet.max_row,
|
|
"column_count": sheet.max_column,
|
|
"rows": rows,
|
|
}
|
|
)
|
|
workbook.close()
|
|
return sections
|
|
|
|
|
|
def _read_pptx(path: Path) -> list[dict[str, object]]:
|
|
from pptx import Presentation
|
|
|
|
presentation = Presentation(str(path))
|
|
sections = []
|
|
for index, slide in enumerate(presentation.slides, start=1):
|
|
texts = []
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
texts.append(shape.text.strip())
|
|
sections.append({"type": "slide", "name": f"幻灯片 {index}", "text": "\n".join(texts)})
|
|
return sections
|
|
|
|
|
|
def _build_preview(sections: list[dict[str, object]]) -> str:
|
|
parts: list[str] = []
|
|
for section in sections:
|
|
if "text" in section and section["text"]:
|
|
parts.append(str(section["text"]))
|
|
rows = section.get("rows")
|
|
if rows:
|
|
parts.extend(" | ".join(str(cell) for cell in row) for row in rows[:5])
|
|
return "\n".join(part for part in parts if part).strip()
|