from dataclasses import dataclass import pytest from review_agent.models import Conversation, FileSummaryBatch, WorkflowNotificationRecord from review_agent.notifications.context import NotificationContext from review_agent.notifications.dispatcher import dispatch_workflow_notification pytestmark = pytest.mark.django_db @dataclass(frozen=True) class FakeTokenResult: ok: bool tenant_access_token: str = "" error_code: str = "" error_message: str = "" @dataclass(frozen=True) class FakeSendResult: ok: bool external_message_id: str = "" error_code: str = "" error_message: str = "" request_duration_ms: int | None = None def _context(user, batch): return NotificationContext( workflow_type="file_summary", workflow_name="自动汇总", workflow_batch_id=batch.pk, workflow_batch_no=batch.batch_no, workflow_status=batch.status, trigger_user_id=user.pk, trigger_username=user.username, title="自动汇总完成", summary_lines=("文件 1 个",), next_step="查看汇总", result_path=f"/file-summary/{batch.pk}/", ) def _batch(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="飞书") batch = FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-DISPATCH", status=FileSummaryBatch.Status.SUCCESS, ) return user, batch def test_dispatch_disabled_writes_record_without_api_call(django_user_model, settings, monkeypatch): user, batch = _batch(django_user_model) settings.FEISHU_NOTIFY_ENABLED = False def fail_call(*args, **kwargs): raise AssertionError("should not call external service") monkeypatch.setattr("review_agent.notifications.dispatcher.send_personal_message", fail_call) record = dispatch_workflow_notification(_context(user, batch)) assert record.send_status == WorkflowNotificationRecord.SendStatus.DISABLED assert record.channel == WorkflowNotificationRecord.Channel.DISABLED def test_dispatch_success_writes_success_record(django_user_model, settings, monkeypatch): user, batch = _batch(django_user_model) settings.FEISHU_NOTIFY_ENABLED = True settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" monkeypatch.setattr( "review_agent.notifications.dispatcher.get_tenant_access_token", lambda: FakeTokenResult(ok=True, tenant_access_token="token"), ) monkeypatch.setattr( "review_agent.notifications.dispatcher.send_personal_message", lambda **kwargs: FakeSendResult(ok=True, external_message_id="om_xxx", request_duration_ms=12), ) record = dispatch_workflow_notification(_context(user, batch)) assert record.send_status == WorkflowNotificationRecord.SendStatus.SUCCESS assert record.external_message_id == "om_xxx" assert record.sent_at is not None def test_dispatch_existing_success_skips_api(django_user_model, settings, monkeypatch): user, batch = _batch(django_user_model) settings.FEISHU_NOTIFY_ENABLED = True context = _context(user, batch) existing = WorkflowNotificationRecord.objects.create( workflow_type=context.workflow_type, workflow_batch_id=context.workflow_batch_id, workflow_batch_no=context.workflow_batch_no, workflow_status=context.workflow_status, dedupe_key=context.dedupe_key, trigger_user=user, channel=WorkflowNotificationRecord.Channel.FEISHU_API, send_status=WorkflowNotificationRecord.SendStatus.SUCCESS, message_title=context.title, ) def fail_call(*args, **kwargs): raise AssertionError("duplicate should not call API") monkeypatch.setattr("review_agent.notifications.dispatcher.send_personal_message", fail_call) assert dispatch_workflow_notification(context).pk == existing.pk def test_dispatch_existing_failed_allows_retry(django_user_model, settings, monkeypatch): user, batch = _batch(django_user_model) settings.FEISHU_NOTIFY_ENABLED = True settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" context = _context(user, batch) WorkflowNotificationRecord.objects.create( workflow_type=context.workflow_type, workflow_batch_id=context.workflow_batch_id, workflow_batch_no=context.workflow_batch_no, workflow_status=context.workflow_status, dedupe_key=context.dedupe_key, trigger_user=user, channel=WorkflowNotificationRecord.Channel.FEISHU_API, send_status=WorkflowNotificationRecord.SendStatus.FAILED, message_title=context.title, ) monkeypatch.setattr( "review_agent.notifications.dispatcher.get_tenant_access_token", lambda: FakeTokenResult(ok=True, tenant_access_token="token"), ) monkeypatch.setattr( "review_agent.notifications.dispatcher.send_personal_message", lambda **kwargs: FakeSendResult(ok=True, external_message_id="om_retry"), ) record = dispatch_workflow_notification(context) assert record.send_status == WorkflowNotificationRecord.SendStatus.SUCCESS assert WorkflowNotificationRecord.objects.filter(dedupe_key=context.dedupe_key).count() == 2 def test_dispatch_token_failure_writes_failed(django_user_model, settings, monkeypatch): user, batch = _batch(django_user_model) settings.FEISHU_NOTIFY_ENABLED = True settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" monkeypatch.setattr( "review_agent.notifications.dispatcher.get_tenant_access_token", lambda: FakeTokenResult(ok=False, error_code="token_error", error_message="bad secret"), ) record = dispatch_workflow_notification(_context(user, batch)) assert record.send_status == WorkflowNotificationRecord.SendStatus.FAILED assert record.error_code == "token_error"