From 820069f55818d1d857c1fdfaa093d5b71d8768ef Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 22:07:00 +0800 Subject: [PATCH] feat: add workflow notification dispatcher --- review_agent/notifications/dispatcher.py | 95 +++++++++++ review_agent/notifications/records.py | 114 +++++++++++++ tests/test_feishu_notification_dispatcher.py | 160 +++++++++++++++++++ 3 files changed, 369 insertions(+) create mode 100644 review_agent/notifications/dispatcher.py create mode 100644 review_agent/notifications/records.py create mode 100644 tests/test_feishu_notification_dispatcher.py diff --git a/review_agent/notifications/dispatcher.py b/review_agent/notifications/dispatcher.py new file mode 100644 index 0000000..0bc3480 --- /dev/null +++ b/review_agent/notifications/dispatcher.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import logging + +from django.conf import settings + +from review_agent.models import WorkflowNotificationRecord + +from .context import NotificationContext +from .feishu_message_api import send_personal_message +from .feishu_token import get_tenant_access_token +from .message_builder import build_feishu_post_message, build_message_summary +from .recipient import ResolvedFeishuTarget, resolve_configured_personal_recipient +from .records import ( + create_disabled_record, + create_failed_record, + create_success_record, + existing_success_record, +) + + +logger = logging.getLogger("review_agent.notifications.dispatcher") + + +def dispatch_workflow_notification(context: NotificationContext) -> WorkflowNotificationRecord: + existing = existing_success_record(context) + if existing: + return existing + + try: + target = resolve_configured_personal_recipient() + summary = build_message_summary(context, target) + + if not getattr(settings, "FEISHU_NOTIFY_ENABLED", False): + return create_disabled_record(context, target, summary) + + if not target.ok: + return create_failed_record( + context, + target, + summary, + error_code=target.error_code, + error_message=target.error_message, + ) + + token_result = get_tenant_access_token() + if not token_result.ok: + return create_failed_record( + context, + target, + summary, + error_code=token_result.error_code, + error_message=token_result.error_message, + ) + + payload = build_feishu_post_message(context, target) + send_result = send_personal_message( + tenant_access_token=token_result.tenant_access_token, + receive_id_type=target.identifier_type, + payload=payload, + ) + if send_result.ok: + return create_success_record( + context, + target, + summary, + external_message_id=send_result.external_message_id, + request_duration_ms=send_result.request_duration_ms, + ) + return create_failed_record( + context, + target, + summary, + error_code=send_result.error_code, + error_message=send_result.error_message, + request_duration_ms=send_result.request_duration_ms, + ) + except Exception as exc: + logger.exception("Feishu notification dispatch failed", extra={"dedupe_key": context.dedupe_key}) + fallback_target = ResolvedFeishuTarget( + ok=False, + identifier_type="missing", + identifier_value="", + display_name=getattr(settings, "FEISHU_DEFAULT_TARGET_NAME", "默认飞书接收人"), + masked_identifier="", + error_code="dispatch_exception", + error_message=str(exc), + ) + return create_failed_record( + context, + fallback_target, + "\n".join([context.title, *context.summary_lines]), + error_code="dispatch_exception", + error_message=str(exc), + ) diff --git a/review_agent/notifications/records.py b/review_agent/notifications/records.py new file mode 100644 index 0000000..409f055 --- /dev/null +++ b/review_agent/notifications/records.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from django.utils import timezone + +from review_agent.models import WorkflowNotificationRecord + +from .context import NotificationContext +from .message_builder import absolute_result_url +from .recipient import ResolvedFeishuTarget + + +def existing_success_record(context: NotificationContext) -> WorkflowNotificationRecord | None: + return ( + WorkflowNotificationRecord.objects.filter( + dedupe_key=context.dedupe_key, + send_status=WorkflowNotificationRecord.SendStatus.SUCCESS, + ) + .order_by("-created_at", "-id") + .first() + ) + + +def create_disabled_record( + context: NotificationContext, + target: ResolvedFeishuTarget, + message_summary: str, +) -> WorkflowNotificationRecord: + return _create_record( + context, + target, + channel=WorkflowNotificationRecord.Channel.DISABLED, + send_status=WorkflowNotificationRecord.SendStatus.DISABLED, + message_summary=message_summary, + error_code="notify_disabled", + error_message="FEISHU_NOTIFY_ENABLED 未启用", + ) + + +def create_failed_record( + context: NotificationContext, + target: ResolvedFeishuTarget, + message_summary: str, + *, + error_code: str, + error_message: str, + request_duration_ms: int | None = None, +) -> WorkflowNotificationRecord: + return _create_record( + context, + target, + channel=WorkflowNotificationRecord.Channel.FEISHU_API, + send_status=WorkflowNotificationRecord.SendStatus.FAILED, + message_summary=message_summary, + error_code=error_code, + error_message=error_message, + request_duration_ms=request_duration_ms, + ) + + +def create_success_record( + context: NotificationContext, + target: ResolvedFeishuTarget, + message_summary: str, + *, + external_message_id: str, + request_duration_ms: int | None = None, +) -> WorkflowNotificationRecord: + return _create_record( + context, + target, + channel=WorkflowNotificationRecord.Channel.FEISHU_API, + send_status=WorkflowNotificationRecord.SendStatus.SUCCESS, + message_summary=message_summary, + external_message_id=external_message_id, + request_duration_ms=request_duration_ms, + sent_at=timezone.now(), + ) + + +def _create_record( + context: NotificationContext, + target: ResolvedFeishuTarget, + *, + channel: str, + send_status: str, + message_summary: str, + error_code: str = "", + error_message: str = "", + external_message_id: str = "", + request_duration_ms: int | None = None, + sent_at=None, +) -> WorkflowNotificationRecord: + return WorkflowNotificationRecord.objects.create( + workflow_type=context.workflow_type, + workflow_batch_id=context.workflow_batch_id, + workflow_batch_no=context.workflow_batch_no, + workflow_status=context.workflow_status, + dedupe_key=context.dedupe_key, + trigger_user_id=context.trigger_user_id, + channel=channel, + target=target.display_name, + at_display_name=target.display_name, + at_identifier_type=target.identifier_type, + at_identifier_masked=target.masked_identifier, + send_status=send_status, + message_title=context.title, + message_summary=message_summary, + result_url=absolute_result_url(context.result_path), + external_message_id=external_message_id, + error_code=error_code, + error_message=error_message[:1000], + request_duration_ms=request_duration_ms, + sent_at=sent_at, + ) diff --git a/tests/test_feishu_notification_dispatcher.py b/tests/test_feishu_notification_dispatcher.py new file mode 100644 index 0000000..39dc940 --- /dev/null +++ b/tests/test_feishu_notification_dispatcher.py @@ -0,0 +1,160 @@ +from dataclasses import dataclass + +import pytest + +from review_agent.models import Conversation, FileSummaryBatch, WorkflowNotificationRecord +from review_agent.notifications.context import NotificationContext +from review_agent.notifications.dispatcher import dispatch_workflow_notification + + +pytestmark = pytest.mark.django_db + + +@dataclass(frozen=True) +class FakeTokenResult: + ok: bool + tenant_access_token: str = "" + error_code: str = "" + error_message: str = "" + + +@dataclass(frozen=True) +class FakeSendResult: + ok: bool + external_message_id: str = "" + error_code: str = "" + error_message: str = "" + request_duration_ms: int | None = None + + +def _context(user, batch): + return NotificationContext( + workflow_type="file_summary", + workflow_name="自动汇总", + workflow_batch_id=batch.pk, + workflow_batch_no=batch.batch_no, + workflow_status=batch.status, + trigger_user_id=user.pk, + trigger_username=user.username, + title="自动汇总完成", + summary_lines=("文件 1 个",), + next_step="查看汇总", + result_path=f"/file-summary/{batch.pk}/", + ) + + +def _batch(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="飞书") + batch = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-DISPATCH", + status=FileSummaryBatch.Status.SUCCESS, + ) + return user, batch + + +def test_dispatch_disabled_writes_record_without_api_call(django_user_model, settings, monkeypatch): + user, batch = _batch(django_user_model) + settings.FEISHU_NOTIFY_ENABLED = False + + def fail_call(*args, **kwargs): + raise AssertionError("should not call external service") + + monkeypatch.setattr("review_agent.notifications.dispatcher.send_personal_message", fail_call) + + record = dispatch_workflow_notification(_context(user, batch)) + + assert record.send_status == WorkflowNotificationRecord.SendStatus.DISABLED + assert record.channel == WorkflowNotificationRecord.Channel.DISABLED + + +def test_dispatch_success_writes_success_record(django_user_model, settings, monkeypatch): + user, batch = _batch(django_user_model) + settings.FEISHU_NOTIFY_ENABLED = True + settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" + monkeypatch.setattr( + "review_agent.notifications.dispatcher.get_tenant_access_token", + lambda: FakeTokenResult(ok=True, tenant_access_token="token"), + ) + monkeypatch.setattr( + "review_agent.notifications.dispatcher.send_personal_message", + lambda **kwargs: FakeSendResult(ok=True, external_message_id="om_xxx", request_duration_ms=12), + ) + + record = dispatch_workflow_notification(_context(user, batch)) + + assert record.send_status == WorkflowNotificationRecord.SendStatus.SUCCESS + assert record.external_message_id == "om_xxx" + assert record.sent_at is not None + + +def test_dispatch_existing_success_skips_api(django_user_model, settings, monkeypatch): + user, batch = _batch(django_user_model) + settings.FEISHU_NOTIFY_ENABLED = True + context = _context(user, batch) + existing = WorkflowNotificationRecord.objects.create( + workflow_type=context.workflow_type, + workflow_batch_id=context.workflow_batch_id, + workflow_batch_no=context.workflow_batch_no, + workflow_status=context.workflow_status, + dedupe_key=context.dedupe_key, + trigger_user=user, + channel=WorkflowNotificationRecord.Channel.FEISHU_API, + send_status=WorkflowNotificationRecord.SendStatus.SUCCESS, + message_title=context.title, + ) + + def fail_call(*args, **kwargs): + raise AssertionError("duplicate should not call API") + + monkeypatch.setattr("review_agent.notifications.dispatcher.send_personal_message", fail_call) + + assert dispatch_workflow_notification(context).pk == existing.pk + + +def test_dispatch_existing_failed_allows_retry(django_user_model, settings, monkeypatch): + user, batch = _batch(django_user_model) + settings.FEISHU_NOTIFY_ENABLED = True + settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" + context = _context(user, batch) + WorkflowNotificationRecord.objects.create( + workflow_type=context.workflow_type, + workflow_batch_id=context.workflow_batch_id, + workflow_batch_no=context.workflow_batch_no, + workflow_status=context.workflow_status, + dedupe_key=context.dedupe_key, + trigger_user=user, + channel=WorkflowNotificationRecord.Channel.FEISHU_API, + send_status=WorkflowNotificationRecord.SendStatus.FAILED, + message_title=context.title, + ) + monkeypatch.setattr( + "review_agent.notifications.dispatcher.get_tenant_access_token", + lambda: FakeTokenResult(ok=True, tenant_access_token="token"), + ) + monkeypatch.setattr( + "review_agent.notifications.dispatcher.send_personal_message", + lambda **kwargs: FakeSendResult(ok=True, external_message_id="om_retry"), + ) + + record = dispatch_workflow_notification(context) + + assert record.send_status == WorkflowNotificationRecord.SendStatus.SUCCESS + assert WorkflowNotificationRecord.objects.filter(dedupe_key=context.dedupe_key).count() == 2 + + +def test_dispatch_token_failure_writes_failed(django_user_model, settings, monkeypatch): + user, batch = _batch(django_user_model) + settings.FEISHU_NOTIFY_ENABLED = True + settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" + monkeypatch.setattr( + "review_agent.notifications.dispatcher.get_tenant_access_token", + lambda: FakeTokenResult(ok=False, error_code="token_error", error_message="bad secret"), + ) + + record = dispatch_workflow_notification(_context(user, batch)) + + assert record.send_status == WorkflowNotificationRecord.SendStatus.FAILED + assert record.error_code == "token_error"