feat(attachments): 增加附件阅读解析能力

This commit is contained in:
2026-06-06 16:37:54 +08:00
parent fd88ff4652
commit 47b5ad1054
6 changed files with 471 additions and 2 deletions

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
import csv
from dataclasses import asdict, dataclass, field
from pathlib import Path
from django.conf import settings
from review_agent.models import FileAttachment
TEXT_EXTENSIONS = {"txt", "md", "csv", "json", "log"}
SUPPORTED_EXTENSIONS = TEXT_EXTENSIONS | {"pdf", "docx", "xlsx", "pptx"}
MAX_PREVIEW_CHARS = 3000
MAX_ROWS_PER_SHEET = 20
@dataclass(frozen=True)
class AttachmentReadResult:
status: str
filename: str
file_type: str
file_size: int
preview_text: str = ""
sections: list[dict[str, object]] = field(default_factory=list)
error_message: str = ""
def to_dict(self) -> dict[str, object]:
return asdict(self)
def read_attachment_details(attachment: FileAttachment) -> AttachmentReadResult:
file_path = _attachment_absolute_path(attachment)
file_type = Path(attachment.original_name).suffix.lower().lstrip(".")
if not file_path.exists():
return _failed(attachment, file_type, "附件文件不存在。")
if file_type not in SUPPORTED_EXTENSIONS:
return _failed(attachment, file_type, f"暂不支持解析 .{file_type or 'unknown'} 文件。", "unsupported")
try:
if file_type == "pdf":
sections = _read_pdf(file_path)
elif file_type == "docx":
sections = _read_docx(file_path)
elif file_type == "xlsx":
sections = _read_xlsx(file_path)
elif file_type == "pptx":
sections = _read_pptx(file_path)
elif file_type == "csv":
sections = _read_csv(file_path)
else:
sections = _read_text(file_path)
except Exception as exc:
return _failed(attachment, file_type, str(exc))
preview = _build_preview(sections)
return AttachmentReadResult(
status="success",
filename=attachment.original_name,
file_type=file_type,
file_size=attachment.file_size,
preview_text=preview[:MAX_PREVIEW_CHARS],
sections=sections,
)
def _attachment_absolute_path(attachment: FileAttachment) -> Path:
path = Path(attachment.storage_path)
if path.is_absolute():
return path
return Path(settings.MEDIA_ROOT) / path
def _failed(
attachment: FileAttachment,
file_type: str,
message: str,
status: str = "failed",
) -> AttachmentReadResult:
return AttachmentReadResult(
status=status,
filename=attachment.original_name,
file_type=file_type,
file_size=attachment.file_size,
error_message=message,
)
def _read_text(path: Path) -> list[dict[str, object]]:
text = path.read_text(encoding="utf-8", errors="replace")
return [{"type": "text", "name": path.name, "text": text[:MAX_PREVIEW_CHARS]}]
def _read_csv(path: Path) -> list[dict[str, object]]:
with path.open("r", encoding="utf-8-sig", errors="replace", newline="") as handle:
rows = [[str(cell) for cell in row] for row in csv.reader(handle)]
return [
{
"type": "table",
"name": path.name,
"row_count": len(rows),
"rows": rows[:MAX_ROWS_PER_SHEET],
}
]
def _read_pdf(path: Path) -> list[dict[str, object]]:
from pypdf import PdfReader
reader = PdfReader(str(path))
pages = []
for index, page in enumerate(reader.pages, start=1):
text = page.extract_text() or ""
pages.append({"type": "page", "name": f"{index}", "text": text})
return pages
def _read_docx(path: Path) -> list[dict[str, object]]:
from docx import Document
document = Document(str(path))
paragraphs = [item.text.strip() for item in document.paragraphs if item.text.strip()]
sections: list[dict[str, object]] = [
{"type": "text", "name": "正文", "text": "\n".join(paragraphs)}
]
for index, table in enumerate(document.tables, start=1):
rows = [[cell.text.strip() for cell in row.cells] for row in table.rows]
sections.append(
{
"type": "table",
"name": f"表格 {index}",
"row_count": len(rows),
"rows": rows[:MAX_ROWS_PER_SHEET],
}
)
return sections
def _read_xlsx(path: Path) -> list[dict[str, object]]:
from openpyxl import load_workbook
workbook = load_workbook(str(path), read_only=True, data_only=True)
sections = []
for sheet in workbook.worksheets:
rows = []
for row in sheet.iter_rows(max_row=MAX_ROWS_PER_SHEET, values_only=True):
rows.append(["" if cell is None else str(cell) for cell in row])
sections.append(
{
"type": "sheet",
"name": sheet.title,
"row_count": sheet.max_row,
"column_count": sheet.max_column,
"rows": rows,
}
)
workbook.close()
return sections
def _read_pptx(path: Path) -> list[dict[str, object]]:
from pptx import Presentation
presentation = Presentation(str(path))
sections = []
for index, slide in enumerate(presentation.slides, start=1):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
texts.append(shape.text.strip())
sections.append({"type": "slide", "name": f"幻灯片 {index}", "text": "\n".join(texts)})
return sections
def _build_preview(sections: list[dict[str, object]]) -> str:
parts: list[str] = []
for section in sections:
if "text" in section and section["text"]:
parts.append(str(section["text"]))
rows = section.get("rows")
if rows:
parts.extend(" | ".join(str(cell) for cell in row) for row in rows[:5])
return "\n".join(part for part in parts if part).strip()

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from collections.abc import Iterable
from review_agent.models import FileAttachment
from ..services.attachment_reader import read_attachment_details
from .base import BaseSkill, SkillResult, WorkflowContext
class AttachmentReaderSkill(BaseSkill):
name = "attachment_reader"
def run(self, context: WorkflowContext) -> SkillResult:
attachments = FileAttachment.objects.filter(
conversation=context.batch.conversation,
is_active=True,
).exclude(upload_status=FileAttachment.UploadStatus.DELETED)
return self.run_for_attachments(attachments)
def run_for_attachments(self, attachments: Iterable[FileAttachment]) -> SkillResult:
results = [read_attachment_details(attachment).to_dict() for attachment in attachments]
if not results:
return SkillResult(success=False, message="当前对话没有可读取的附件。")
has_success = any(item["status"] == "success" for item in results)
return SkillResult(
success=has_success,
data={"attachments": results},
message="附件解析完成。" if has_success else "附件解析失败。",
)

View File

@@ -6,6 +6,19 @@ from review_agent.models import Conversation, FileAttachment
TRIGGER_KEYWORDS = ("自动汇总", "文件目录", "页数", "目录与页数", "文件清单") TRIGGER_KEYWORDS = ("自动汇总", "文件目录", "页数", "目录与页数", "文件清单")
ATTACHMENT_READER_KEYWORDS = (
"阅读附件",
"读取附件",
"解析附件",
"分析附件",
"查看附件",
"附件详情",
"文件详情",
"总结附件",
"总结文件",
"分析这个文件",
"阅读这个文件",
)
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -28,3 +41,18 @@ def evaluate_file_summary_trigger(conversation: Conversation, content: str) -> T
return TriggerResult(should_start=False, reason="missing_attachment") return TriggerResult(should_start=False, reason="missing_attachment")
return TriggerResult(should_start=True, workflow_type="file_summary") return TriggerResult(should_start=True, workflow_type="file_summary")
def evaluate_attachment_reader_trigger(conversation: Conversation, content: str) -> TriggerResult:
text = (content or "").strip()
if not any(keyword in text for keyword in ATTACHMENT_READER_KEYWORDS):
return TriggerResult(should_start=False, reason="not_matched")
has_attachment = FileAttachment.objects.filter(
conversation=conversation,
is_active=True,
).exclude(upload_status=FileAttachment.UploadStatus.DELETED).exists()
if not has_attachment:
return TriggerResult(should_start=False, reason="missing_attachment")
return TriggerResult(should_start=True, workflow_type="attachment_reader")

View File

@@ -6,10 +6,14 @@ from django.db.models import Q, QuerySet
from django.conf import settings from django.conf import settings
from django.utils import timezone from django.utils import timezone
from .file_summary.skills.attachment_reader import AttachmentReaderSkill
from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow
from .file_summary.workflow_trigger import evaluate_file_summary_trigger from .file_summary.workflow_trigger import (
evaluate_attachment_reader_trigger,
evaluate_file_summary_trigger,
)
from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply
from .models import Conversation, Message from .models import Conversation, FileAttachment, Message
def list_conversations(user, search: str = "") -> QuerySet[Conversation]: def list_conversations(user, search: str = "") -> QuerySet[Conversation]:
@@ -92,6 +96,7 @@ def stream_message(conversation: Conversation, content: str):
user_message = append_user_message(conversation, content) user_message = append_user_message(conversation, content)
assistant_parts: list[str] = [] assistant_parts: list[str] = []
trigger = evaluate_file_summary_trigger(conversation, content) trigger = evaluate_file_summary_trigger(conversation, content)
attachment_reader_trigger = evaluate_attachment_reader_trigger(conversation, content)
yield sse_event( yield sse_event(
"meta", "meta",
@@ -117,6 +122,36 @@ def stream_message(conversation: Conversation, content: str):
) )
return return
if attachment_reader_trigger.reason == "missing_attachment":
reply_content = "请先在当前对话右侧上传需要阅读的附件,然后再发送解析或阅读附件指令。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if attachment_reader_trigger.should_start:
attachments = _select_attachments_for_reader(conversation, content)
result = AttachmentReaderSkill().run_for_attachments(attachments)
reply_content = _format_attachment_reader_reply(result.data.get("attachments", []), result.message)
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if trigger.should_start: if trigger.should_start:
batch = create_file_summary_batch( batch = create_file_summary_batch(
conversation=conversation, conversation=conversation,
@@ -182,6 +217,62 @@ def build_conversation_title(content: str) -> str:
return normalized[:24] return normalized[:24]
def _select_attachments_for_reader(conversation: Conversation, content: str):
attachments = list(
FileAttachment.objects.filter(
conversation=conversation,
is_active=True,
)
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
.order_by("original_name", "-version_no")
)
matched = [attachment for attachment in attachments if attachment.original_name in content]
return matched or attachments
def _format_attachment_reader_reply(attachments: list[dict[str, object]], message: str) -> str:
if not attachments:
return message or "当前对话没有可读取的附件。"
lines = ["## 附件解析结果"]
for item in attachments:
status = item.get("status", "")
filename = item.get("filename", "")
file_type = item.get("file_type", "")
lines.extend(
[
"",
f"### {filename}",
f"- 类型:{file_type or '未知'}",
f"- 状态:{status}",
]
)
if item.get("error_message"):
lines.append(f"- 错误:{item['error_message']}")
continue
preview = str(item.get("preview_text") or "").strip()
if preview:
lines.extend(["", "摘要预览:", "```text", preview, "```"])
sections = item.get("sections") or []
if sections:
lines.append("")
lines.append("结构详情:")
for section in sections[:8]:
if not isinstance(section, dict):
continue
section_type = section.get("type", "section")
name = section.get("name", "")
extra = ""
if "row_count" in section:
extra = f"{section['row_count']}"
if "column_count" in section:
extra += f"{section['column_count']}"
lines.append(f"- {name}{section_type}{extra}")
return "\n".join(lines).strip()
def sse_event(event_name: str, payload: dict[str, object]) -> str: def sse_event(event_name: str, payload: dict[str, object]) -> str:
"""Formats one server-sent event frame.""" """Formats one server-sent event frame."""

View File

@@ -0,0 +1,111 @@
from pathlib import Path
import pytest
from django.conf import settings
from review_agent.models import Conversation, FileAttachment
pytestmark = pytest.mark.django_db
def test_read_attachment_extracts_text_file_details(settings, tmp_path, django_user_model):
from review_agent.file_summary.services.attachment_reader import read_attachment_details
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
relative_path = Path("uploads") / "note.txt"
absolute_path = tmp_path / relative_path
absolute_path.parent.mkdir(parents=True)
absolute_path.write_text("产品名称:智能审核\n关键结论:可以解析附件详情", encoding="utf-8")
attachment = FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="note.txt",
storage_path=relative_path.as_posix(),
file_size=absolute_path.stat().st_size,
content_type="text/plain",
)
result = read_attachment_details(attachment)
assert result.status == "success"
assert result.filename == "note.txt"
assert result.file_type == "txt"
assert "智能审核" in result.preview_text
assert result.sections[0]["type"] == "text"
def test_read_attachment_extracts_docx_and_xlsx_details(settings, tmp_path, django_user_model):
from docx import Document
from openpyxl import Workbook
from review_agent.file_summary.services.attachment_reader import read_attachment_details
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
docx_path = tmp_path / "uploads" / "summary.docx"
docx_path.parent.mkdir(parents=True)
doc = Document()
doc.add_heading("项目摘要", level=1)
doc.add_paragraph("这是 Word 附件里的正文。")
doc.save(docx_path)
docx_attachment = FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="summary.docx",
storage_path="uploads/summary.docx",
file_size=docx_path.stat().st_size,
)
workbook_path = tmp_path / "uploads" / "inventory.xlsx"
workbook = Workbook()
sheet = workbook.active
sheet.title = "清单"
sheet.append(["文件名", "页数"])
sheet.append(["a.pdf", 3])
workbook.save(workbook_path)
xlsx_attachment = FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="inventory.xlsx",
storage_path="uploads/inventory.xlsx",
file_size=workbook_path.stat().st_size,
)
docx_result = read_attachment_details(docx_attachment)
xlsx_result = read_attachment_details(xlsx_attachment)
assert docx_result.status == "success"
assert "项目摘要" in docx_result.preview_text
assert "Word 附件里的正文" in docx_result.preview_text
assert xlsx_result.status == "success"
assert xlsx_result.sections[0]["name"] == "清单"
assert xlsx_result.sections[0]["rows"][1] == ["a.pdf", "3"]
def test_attachment_reader_skill_returns_structured_details(settings, tmp_path, django_user_model):
from review_agent.file_summary.skills.attachment_reader import AttachmentReaderSkill
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
file_path = tmp_path / "uploads" / "readme.txt"
file_path.parent.mkdir(parents=True)
file_path.write_text("请读取这个附件。", encoding="utf-8")
attachment = FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="readme.txt",
storage_path="uploads/readme.txt",
file_size=file_path.stat().st_size,
)
result = AttachmentReaderSkill().run_for_attachments([attachment])
assert result.success is True
assert result.data["attachments"][0]["filename"] == "readme.txt"
assert "请读取这个附件" in result.data["attachments"][0]["preview_text"]

View File

@@ -100,3 +100,27 @@ def test_stream_message_uses_normal_llm_path_when_not_triggered(monkeypatch, dja
joined = "".join(frames) joined = "".join(frames)
assert "普通回复" in joined assert "普通回复" in joined
assert "workflow_started" not in joined assert "workflow_started" not in joined
def test_stream_message_reads_active_attachment_when_requested(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
attachment_path = tmp_path / "uploads" / "detail.txt"
attachment_path.parent.mkdir(parents=True)
attachment_path.write_text("合同编号RA-2026\n结论:附件阅读成功", encoding="utf-8")
FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="detail.txt",
storage_path="uploads/detail.txt",
file_size=attachment_path.stat().st_size,
)
frames = list(stream_message(conversation, "请阅读附件并给出详情"))
joined = "".join(frames)
assert "附件解析结果" in joined
assert "detail.txt" in joined
assert "RA-2026" in joined
assert "workflow_started" not in joined