320 lines
12 KiB
Python
320 lines
12 KiB
Python
import pytest
|
||
from pathlib import Path
|
||
from zipfile import ZipFile
|
||
|
||
from review_agent.file_summary.services import archive as archive_service
|
||
from review_agent.file_summary.workflow import create_file_summary_batch, start_file_summary_workflow
|
||
from review_agent.skill_router import SkillRoute
|
||
from review_agent.models import (
|
||
Conversation,
|
||
FileAttachment,
|
||
FileSummaryBatch,
|
||
FileSummaryBatchAttachment,
|
||
Message,
|
||
WorkflowEvent,
|
||
WorkflowNodeRun,
|
||
)
|
||
from review_agent.services import stream_message
|
||
|
||
|
||
pytestmark = pytest.mark.django_db
|
||
|
||
|
||
def test_create_batch_binds_active_attachments_and_initializes_nodes(django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总")
|
||
active = FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="a.docx",
|
||
storage_path="x/a.docx",
|
||
file_size=1,
|
||
)
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="old.docx",
|
||
is_active=False,
|
||
storage_path="x/old.docx",
|
||
file_size=1,
|
||
)
|
||
|
||
batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message)
|
||
|
||
assert batch.status == FileSummaryBatch.Status.PENDING
|
||
assert FileSummaryBatchAttachment.objects.get(batch=batch).attachment == active
|
||
active.refresh_from_db()
|
||
assert active.upload_status == FileAttachment.UploadStatus.BOUND
|
||
assert batch.work_dir
|
||
assert WorkflowNodeRun.objects.filter(batch=batch).count() >= 6
|
||
assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_created").exists()
|
||
|
||
|
||
def test_start_file_summary_workflow_runs_synchronously_for_tests(django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="a.docx",
|
||
storage_path="x/a.docx",
|
||
file_size=1,
|
||
)
|
||
batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message)
|
||
|
||
start_file_summary_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
assert batch.status == FileSummaryBatch.Status.SUCCESS
|
||
assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists()
|
||
|
||
|
||
def test_workflow_extracts_archive_and_scans_extracted_files(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
archive_path = tmp_path / "upload.zip"
|
||
with ZipFile(archive_path, "w") as archive:
|
||
archive.writestr("folder/a.pdf", b"%PDF-1.4\n%%EOF")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="upload.zip",
|
||
storage_path=str(archive_path),
|
||
file_size=archive_path.stat().st_size,
|
||
)
|
||
batch = create_file_summary_batch(conversation=conversation, user=user)
|
||
|
||
start_file_summary_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
assert batch.total_files == 1
|
||
assert batch.items.get().file_name == "a.pdf"
|
||
assert not batch.items.filter(file_type="zip").exists()
|
||
|
||
|
||
def test_workflow_marks_archive_extract_failure_visible(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
archive_path = tmp_path / "empty.zip"
|
||
with ZipFile(archive_path, "w"):
|
||
pass
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="empty.zip",
|
||
storage_path=str(archive_path),
|
||
file_size=archive_path.stat().st_size,
|
||
)
|
||
batch = create_file_summary_batch(conversation=conversation, user=user)
|
||
|
||
start_file_summary_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
extract_node = batch.node_runs.get(node_code="extract")
|
||
assert batch.status == FileSummaryBatch.Status.FAILED
|
||
assert "未解出任何可扫描文件" in batch.error_message
|
||
assert extract_node.status == WorkflowNodeRun.Status.FAILED
|
||
assert "未解出任何可扫描文件" in extract_node.message
|
||
failed_event = WorkflowEvent.objects.filter(
|
||
batch=batch,
|
||
event_type="node_progress",
|
||
payload__status=WorkflowNodeRun.Status.FAILED,
|
||
).latest("id")
|
||
assert "未解出任何可扫描文件" in failed_event.payload["message"]
|
||
|
||
|
||
def test_rar_extract_uses_python_libarchive_before_7z(monkeypatch, tmp_path):
|
||
archive_path = tmp_path / "sample.rar"
|
||
archive_path.write_bytes(b"rar")
|
||
target_dir = tmp_path / "out"
|
||
calls = []
|
||
|
||
def fake_libarchive_extract(path: Path, target: Path):
|
||
calls.append(("libarchive", path, target))
|
||
extracted = target / "a.docx"
|
||
extracted.parent.mkdir(parents=True, exist_ok=True)
|
||
extracted.write_bytes(b"doc")
|
||
return [extracted]
|
||
|
||
def fake_7z_extract(path: Path, target: Path):
|
||
calls.append(("7z", path, target))
|
||
return []
|
||
|
||
monkeypatch.setattr(archive_service, "_extract_rar_with_libarchive", fake_libarchive_extract)
|
||
monkeypatch.setattr(archive_service, "_extract_rar_with_7z", fake_7z_extract)
|
||
|
||
extracted = archive_service.extract_archive(archive_path, target_dir)
|
||
|
||
assert [path.name for path in extracted] == ["a.docx"]
|
||
assert calls == [("libarchive", archive_path, target_dir)]
|
||
|
||
|
||
def test_stream_message_returns_workflow_meta_when_triggered(settings, django_user_model):
|
||
settings.FILE_SUMMARY_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="a.docx",
|
||
storage_path="x/a.docx",
|
||
file_size=1,
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "请自动汇总文件目录与页数"))
|
||
|
||
joined = "".join(frames)
|
||
assert "workflow_started" in joined
|
||
assert "\"workflow_type\": \"file_summary\"" in joined
|
||
assert FileSummaryBatch.objects.filter(conversation=conversation).exists()
|
||
|
||
|
||
def test_stream_message_uses_normal_llm_path_when_not_triggered(monkeypatch, django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
|
||
def fake_stream_reply(conversation, content):
|
||
yield "普通回复"
|
||
|
||
monkeypatch.setattr("review_agent.services.stream_reply", fake_stream_reply)
|
||
|
||
frames = list(stream_message(conversation, "你好"))
|
||
|
||
joined = "".join(frames)
|
||
assert "普通回复" in joined
|
||
assert "workflow_started" not in joined
|
||
|
||
|
||
def test_stream_message_meta_uses_first_prompt_title_for_new_conversation(monkeypatch, django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="新对话 01-01 10:00")
|
||
|
||
def fake_stream_reply(conversation, content):
|
||
yield "普通回复"
|
||
|
||
monkeypatch.setattr("review_agent.services.stream_reply", fake_stream_reply)
|
||
|
||
frames = list(stream_message(conversation, "这是第一条新对话消息"))
|
||
|
||
assert '"title": "这是第一条新对话消息"' in frames[0]
|
||
assert '"title": "这是第一条新对话消息"' in frames[-1]
|
||
|
||
|
||
def test_stream_message_reads_active_attachment_when_requested(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
attachment_path = tmp_path / "uploads" / "detail.txt"
|
||
attachment_path.parent.mkdir(parents=True)
|
||
attachment_path.write_text("合同编号:RA-2026\n结论:附件阅读成功", encoding="utf-8")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="detail.txt",
|
||
storage_path="uploads/detail.txt",
|
||
file_size=attachment_path.stat().st_size,
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "请阅读附件并给出详情"))
|
||
|
||
joined = "".join(frames)
|
||
assert "附件解析结果" in joined
|
||
assert "detail.txt" in joined
|
||
assert "RA-2026" in joined
|
||
assert "workflow_started" not in joined
|
||
|
||
|
||
def test_stream_message_falls_back_to_non_stream_reply_when_stream_breaks(monkeypatch, django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
|
||
def broken_stream_reply(conversation, content):
|
||
yield "已生成部分内容"
|
||
raise RuntimeError("provider connection reset")
|
||
|
||
monkeypatch.setattr("review_agent.services.stream_reply", broken_stream_reply)
|
||
monkeypatch.setattr("review_agent.services.generate_reply", lambda conversation, content: "非流式完整回复")
|
||
|
||
frames = list(stream_message(conversation, "普通问题"))
|
||
|
||
joined = "".join(frames)
|
||
assert "已生成部分内容" in joined
|
||
assert "replace" in joined
|
||
assert "非流式完整回复" in joined
|
||
assert "done" in joined
|
||
assistant_message = Message.objects.get(conversation=conversation, role=Message.Role.ASSISTANT)
|
||
assert assistant_message.content == "非流式完整回复"
|
||
|
||
|
||
def test_stream_message_uses_llm_router_for_attachment_reader(
|
||
monkeypatch,
|
||
settings,
|
||
tmp_path,
|
||
django_user_model,
|
||
):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
attachment_path = tmp_path / "uploads" / "resume.txt"
|
||
attachment_path.parent.mkdir(parents=True)
|
||
attachment_path.write_text("项目经历:负责审核智能体附件解析模块。", encoding="utf-8")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="resume.txt",
|
||
storage_path="uploads/resume.txt",
|
||
file_size=attachment_path.stat().st_size,
|
||
)
|
||
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="attachment_reader",
|
||
skill_name="attachment_reader",
|
||
confidence=0.91,
|
||
reason="需要读取上传简历。",
|
||
source="llm",
|
||
),
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "帮我整理其中的项目经历"))
|
||
|
||
joined = "".join(frames)
|
||
assert "附件解析结果" in joined
|
||
assert "审核智能体附件解析模块" in joined
|
||
assert "模型调用失败" not in joined
|
||
|
||
|
||
def test_stream_message_uses_llm_router_for_file_summary(monkeypatch, settings, django_user_model):
|
||
settings.FILE_SUMMARY_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="a.docx",
|
||
storage_path="x/a.docx",
|
||
file_size=1,
|
||
)
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="file_summary",
|
||
workflow_type="file_summary",
|
||
confidence=0.93,
|
||
reason="需要执行文件目录与页数汇总。",
|
||
source="llm",
|
||
),
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "处理一下这批资料"))
|
||
|
||
joined = "".join(frames)
|
||
assert "workflow_started" in joined
|
||
assert "\"workflow_type\": \"file_summary\"" in joined
|
||
assert FileSummaryBatch.objects.filter(conversation=conversation).exists()
|