feat: add feishu notification test command

This commit is contained in:
2026-06-07 22:14:51 +08:00
parent 1a1b3ee9d4
commit be7fbab0a0
2 changed files with 78 additions and 0 deletions

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError
from review_agent.notifications.context import NotificationContext
from review_agent.notifications.dispatcher import dispatch_workflow_notification
class Command(BaseCommand):
help = "Send a manual Feishu test notification through the unified dispatcher."
def add_arguments(self, parser):
parser.add_argument("--username", required=True, help="System username used as trigger user.")
def handle(self, *args, **options):
username = options["username"]
user = get_user_model().objects.filter(username=username).first()
if not user:
raise CommandError(f"用户不存在:{username}")
context = NotificationContext(
workflow_type="manual_test",
workflow_name="飞书测试",
workflow_batch_id=user.pk,
workflow_batch_no=f"MANUAL-{user.pk}",
workflow_status="success",
trigger_user_id=user.pk,
trigger_username=user.get_username(),
title="飞书测试通知",
summary_lines=("这是一条本地手动测试通知。",),
next_step="确认飞书个人账号是否收到消息",
result_path="/",
)
record = dispatch_workflow_notification(context)
self.stdout.write(f"send_status={record.send_status}")
self.stdout.write(f"target={record.target}")
if record.error_message:
self.stdout.write(f"error={record.error_message}")

View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass
from io import StringIO
from django.core.management import call_command
from django.core.management.base import CommandError
import pytest
pytestmark = pytest.mark.django_db
@dataclass(frozen=True)
class FakeRecord:
send_status: str = "success"
target: str = "负责人"
error_message: str = ""
def test_send_test_feishu_notification_calls_dispatcher(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
calls = []
monkeypatch.setattr(
"review_agent.management.commands.send_test_feishu_notification.dispatch_workflow_notification",
lambda context: calls.append(context) or FakeRecord(),
)
output = StringIO()
call_command("send_test_feishu_notification", "--username", user.username, stdout=output)
assert calls
assert calls[0].workflow_type == "manual_test"
assert calls[0].trigger_user_id == user.pk
assert "send_status=success" in output.getvalue()
assert "target=负责人" in output.getvalue()
def test_send_test_feishu_notification_missing_user_raises():
with pytest.raises(CommandError):
call_command("send_test_feishu_notification", "--username", "missing")