feat(file-summary): 生成汇总报告和导出下载

This commit is contained in:
2026-06-06 01:22:49 +08:00
parent 18d045d487
commit 61bd31790b
8 changed files with 286 additions and 5 deletions

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from pathlib import Path
from openpyxl import Workbook
from review_agent.models import ExportedSummaryFile, FileSummaryBatch
def _exports_dir(batch: FileSummaryBatch) -> Path:
root = Path(batch.work_dir or Path("media") / "file_summary" / batch.batch_no)
export_dir = root / "exports"
export_dir.mkdir(parents=True, exist_ok=True)
return export_dir
def generate_excel_export(batch: FileSummaryBatch) -> ExportedSummaryFile:
workbook = Workbook()
summary = workbook.active
summary.title = "汇总信息"
summary.append(["批次号", batch.batch_no])
summary.append(["产品名称", batch.product_name or "-"])
summary.append(["文件总数", batch.total_files])
summary.append(["统计成功", batch.success_files])
summary.append(["统计失败", batch.failed_files])
summary.append(["不支持", batch.unsupported_files])
summary.append(["不确定", batch.uncertain_files])
summary.append(["总页数", batch.total_pages])
detail = workbook.create_sheet("文件明细")
detail.append(["序号", "目录层级", "文件名", "类型", "页数", "路径", "状态", "重试次数", "异常说明"])
for item in batch.items.order_by("file_index"):
detail.append(
[
item.file_index,
item.directory_level,
item.file_name,
item.file_type,
item.page_count,
item.relative_path,
item.statistics_status,
item.retry_count,
item.error_message,
]
)
path = _exports_dir(batch) / f"{batch.batch_no}-summary.xlsx"
workbook.save(path)
return ExportedSummaryFile.objects.create(
batch=batch,
export_type=ExportedSummaryFile.ExportType.EXCEL,
file_name=path.name,
storage_path=str(path),
)

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
from pathlib import Path
from review_agent.models import ExportedSummaryFile, FileSummaryBatch
def _exports_dir(batch: FileSummaryBatch) -> Path:
root = Path(batch.work_dir or Path("media") / "file_summary" / batch.batch_no)
export_dir = root / "exports"
export_dir.mkdir(parents=True, exist_ok=True)
return export_dir
def build_summary_table(batch: FileSummaryBatch) -> str:
lines = [
"| 序号 | 目录层级 | 文件名 | 类型 | 页数 | 状态 | 异常说明 |",
"| --- | --- | --- | --- | --- | --- | --- |",
]
for item in batch.items.order_by("file_index"):
lines.append(
"| {index} | {directory} | {name} | {file_type} | {pages} | {status} | {error} |".format(
index=item.file_index,
directory=item.directory_level or "-",
name=item.file_name,
file_type=item.file_type,
pages=item.page_count if item.page_count is not None else "-",
status=item.statistics_status,
error=item.error_message or "-",
)
)
return "\n".join(lines)
def build_markdown_report(batch: FileSummaryBatch) -> str:
return "\n\n".join(
[
f"# 文件目录与页数汇总报告\n\n批次号:{batch.batch_no}",
(
"## 汇总信息\n\n"
f"- 产品名称:{batch.product_name or '-'}\n"
f"- 文件总数:{batch.total_files}\n"
f"- 统计成功:{batch.success_files}\n"
f"- 统计失败:{batch.failed_files}\n"
f"- 不支持:{batch.unsupported_files}\n"
f"- 不确定:{batch.uncertain_files}\n"
f"- 总页数:{batch.total_pages}"
),
"## 文件明细\n\n" + build_summary_table(batch),
"## 处理说明\n\n单文件失败不会阻断批次,失败与不确定文件已在明细中标注。",
]
)
def generate_markdown_report(batch: FileSummaryBatch) -> tuple[ExportedSummaryFile, str]:
content = build_markdown_report(batch)
path = _exports_dir(batch) / f"{batch.batch_no}-summary.md"
path.write_text(content, encoding="utf-8")
exported = ExportedSummaryFile.objects.create(
batch=batch,
export_type=ExportedSummaryFile.ExportType.MARKDOWN,
file_name=path.name,
storage_path=str(path),
)
return exported, build_summary_table(batch)

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from django.urls import reverse
from review_agent.models import Message
from ..services.export_excel import generate_excel_export
from ..services.report import generate_markdown_report
from .base import BaseSkill, SkillResult, WorkflowContext
class SummaryReportSkill(BaseSkill):
name = "summary_report"
def run(self, context: WorkflowContext) -> SkillResult:
markdown_export, summary_table = generate_markdown_report(context.batch)
excel_export = generate_excel_export(context.batch)
markdown_url = reverse("file_summary_export_download", args=[markdown_export.pk])
excel_url = reverse("file_summary_export_download", args=[excel_export.pk])
content = (
"文件目录与页数汇总已完成。\n\n"
f"{summary_table}\n\n"
f"[下载 Markdown 报告]({markdown_url}) | [下载 Excel 明细]({excel_url})"
)
Message.objects.create(
conversation=context.batch.conversation,
role=Message.Role.ASSISTANT,
content=content,
)
return SkillResult(
success=True,
data={"markdown_export_id": markdown_export.pk, "excel_export_id": excel_export.pk},
)

View File

@@ -1,8 +1,10 @@
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.http import Http404, JsonResponse from pathlib import Path
from django.http import FileResponse, Http404, JsonResponse
from django.views.decorators.http import require_http_methods from django.views.decorators.http import require_http_methods
from review_agent.models import Conversation, FileAttachment from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment
from review_agent.models import FileSummaryBatch, WorkflowEvent from review_agent.models import FileSummaryBatch, WorkflowEvent
from .events import serialize_event from .events import serialize_event
@@ -105,3 +107,18 @@ def batch_events(request, batch_id: int):
after_id = 0 after_id = 0
events = WorkflowEvent.objects.filter(batch=batch, pk__gt=after_id).order_by("id") events = WorkflowEvent.objects.filter(batch=batch, pk__gt=after_id).order_by("id")
return JsonResponse({"events": [serialize_event(event) for event in events]}) return JsonResponse({"events": [serialize_event(event) for event in events]})
@require_http_methods(["GET"])
@login_required
def export_download(request, export_id: int):
exported = ExportedSummaryFile.objects.filter(
pk=export_id,
batch__user=request.user,
).first()
if not exported:
raise Http404("导出文件不存在。")
path = Path(exported.storage_path)
if not path.exists():
return JsonResponse({"error": "文件不存在。"}, status=404)
return FileResponse(path.open("rb"), as_attachment=True, filename=exported.file_name)

View File

@@ -22,6 +22,7 @@ from .skills.document_page_count import DocumentPageCountSkill
from .skills.file_inventory import FileInventorySkill from .skills.file_inventory import FileInventorySkill
from .skills.product_detect import ProductDetectSkill from .skills.product_detect import ProductDetectSkill
from .skills.registry import SkillRegistry from .skills.registry import SkillRegistry
from .skills.summary_report import SummaryReportSkill
NODE_DEFINITIONS = [ NODE_DEFINITIONS = [
@@ -30,7 +31,7 @@ NODE_DEFINITIONS = [
("inventory", "文件扫描", "file_inventory"), ("inventory", "文件扫描", "file_inventory"),
("page_count", "页数统计", "document_page_count"), ("page_count", "页数统计", "document_page_count"),
("product_detect", "产品识别", "product_detect"), ("product_detect", "产品识别", "product_detect"),
("report", "报告输出", ""), ("report", "报告输出", "summary_report"),
("complete", "完成", ""), ("complete", "完成", ""),
] ]
@@ -41,6 +42,7 @@ def default_skill_registry() -> SkillRegistry:
registry.register(FileInventorySkill()) registry.register(FileInventorySkill())
registry.register(DocumentPageCountSkill()) registry.register(DocumentPageCountSkill())
registry.register(ProductDetectSkill()) registry.register(ProductDetectSkill())
registry.register(SummaryReportSkill())
return registry return registry

View File

@@ -1,6 +1,6 @@
from django.urls import path from django.urls import path
from .file_summary.views import attachment_detail, attachments, batch_events, batch_status from .file_summary.views import attachment_detail, attachments, batch_events, batch_status, export_download
urlpatterns = [ urlpatterns = [
@@ -29,4 +29,9 @@ urlpatterns = [
batch_events, batch_events,
name="file_summary_batch_events", name="file_summary_batch_events",
), ),
path(
"api/review-agent/file-summary/exports/<int:export_id>/download/",
export_download,
name="file_summary_export_download",
),
] ]

View File

@@ -0,0 +1,82 @@
from pathlib import Path
import pytest
from openpyxl import load_workbook
from review_agent.file_summary.services.export_excel import generate_excel_export
from review_agent.file_summary.services.report import generate_markdown_report
from review_agent.models import Conversation, FileSummaryBatch, FileSummaryItem, Message
pytestmark = pytest.mark.django_db
def make_batch(tmp_path, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
batch = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-R",
work_dir=str(tmp_path),
total_files=1,
success_files=1,
total_pages=2,
)
FileSummaryItem.objects.create(
batch=batch,
file_index=1,
file_name="a.xlsx",
file_type="xlsx",
relative_path="a.xlsx",
storage_path=str(tmp_path / "a.xlsx"),
page_count=2,
statistics_status=FileSummaryItem.StatisticsStatus.SUCCESS,
)
return batch
def test_generate_markdown_report_creates_export_and_summary(tmp_path, django_user_model):
batch = make_batch(tmp_path, django_user_model)
exported, summary = generate_markdown_report(batch)
assert exported.export_type == "markdown"
assert Path(exported.storage_path).exists()
assert "| 序号 | 目录层级 | 文件名 | 类型 | 页数 | 状态 | 异常说明 |" in summary
assert "a.xlsx" in Path(exported.storage_path).read_text(encoding="utf-8")
def test_generate_excel_export_contains_summary_and_items(tmp_path, django_user_model):
batch = make_batch(tmp_path, django_user_model)
exported = generate_excel_export(batch)
workbook = load_workbook(exported.storage_path)
assert workbook.sheetnames == ["汇总信息", "文件明细"]
assert workbook["文件明细"]["C2"].value == "a.xlsx"
def test_workflow_report_node_writes_assistant_message(tmp_path, settings, django_user_model):
from review_agent.file_summary.workflow import create_file_summary_batch, start_file_summary_workflow
from review_agent.models import FileAttachment
settings.MEDIA_ROOT = tmp_path
settings.FILE_SUMMARY_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
file_path = tmp_path / "a.xlsx"
file_path.write_bytes(b"not a real workbook")
FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="a.txt",
storage_path=str(file_path),
file_size=file_path.stat().st_size,
)
batch = create_file_summary_batch(conversation=conversation, user=user)
batch.work_dir = str(tmp_path / "batch")
batch.save(update_fields=["work_dir"])
start_file_summary_workflow(batch, async_run=False)
assert Message.objects.filter(conversation=conversation, role=Message.Role.ASSISTANT).exists()

View File

@@ -2,7 +2,7 @@ from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse from django.urls import reverse
import pytest import pytest
from review_agent.models import Conversation, FileAttachment from review_agent.models import Conversation, ExportedSummaryFile, FileAttachment, FileSummaryBatch
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@@ -73,3 +73,26 @@ def test_delete_attachment_is_logical_and_scoped(client, settings, tmp_path, dja
assert response.status_code == 200 assert response.status_code == 200
assert attachment.upload_status == FileAttachment.UploadStatus.DELETED assert attachment.upload_status == FileAttachment.UploadStatus.DELETED
assert attachment.is_active is False assert attachment.is_active is False
def test_export_download_requires_batch_owner(client, tmp_path, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")
conversation = Conversation.objects.create(user=owner, title="会话")
batch = FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-DL")
report_path = tmp_path / "summary.md"
report_path.write_text("ok", encoding="utf-8")
exported = ExportedSummaryFile.objects.create(
batch=batch,
export_type=ExportedSummaryFile.ExportType.MARKDOWN,
file_name="summary.md",
storage_path=str(report_path),
)
client.force_login(other)
denied = client.get(reverse("file_summary_export_download", args=[exported.pk]))
assert denied.status_code == 404
client.force_login(owner)
allowed = client.get(reverse("file_summary_export_download", args=[exported.pk]))
assert allowed.status_code == 200