Files
DEMO-AGENT/tests/test_application_form_fill_notification.py

73 lines
2.7 KiB
Python

import pytest
from review_agent.application_form_fill.services.notifier import notify_completion
from review_agent.models import (
ApplicationFormFillBatch,
ApplicationFormFillNotificationRecord,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
)
pytestmark = pytest.mark.django_db
def test_notify_completion_records_success(django_user_model, monkeypatch):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-NOTIFY",
selected_templates=["registration_certificate"],
)
exported = ExportedSummaryFile.objects.create(
batch=summary,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path="filled.docx",
)
calls = []
fake_record = type(
"Record",
(),
{"send_status": "success", "SendStatus": type("SendStatus", (), {"FAILED": "failed"}), "error_message": ""},
)()
monkeypatch.setattr(
"review_agent.application_form_fill.services.notifier.dispatch_workflow_notification",
lambda context: calls.append(context) or fake_record,
)
record = notify_completion(batch, [exported])
assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.SUCCESS
assert record.export_ids == [exported.pk]
assert record.template_codes == ["registration_certificate"]
assert record.sent_at is not None
assert calls[0].workflow_type == "application_form_fill"
def test_notify_completion_records_failure_without_raising(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY-FAIL")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="AFF-NOTIFY-FAIL",
selected_templates=["registration_certificate"],
)
record = notify_completion(batch, [], fail=True)
assert record.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED
assert record.retry_count == 1
assert "mock" in record.error_message