from __future__ import annotations from django.contrib.auth import get_user_model from django.core.management.base import BaseCommand, CommandError from review_agent.notifications.context import NotificationContext from review_agent.notifications.dispatcher import dispatch_workflow_notification class Command(BaseCommand): help = "Send a manual Feishu test notification through the unified dispatcher." def add_arguments(self, parser): parser.add_argument("--username", required=True, help="System username used as trigger user.") def handle(self, *args, **options): username = options["username"] user = get_user_model().objects.filter(username=username).first() if not user: raise CommandError(f"用户不存在:{username}") context = NotificationContext( workflow_type="manual_test", workflow_name="飞书测试", workflow_batch_id=user.pk, workflow_batch_no=f"MANUAL-{user.pk}", workflow_status="success", trigger_user_id=user.pk, trigger_username=user.get_username(), title="飞书测试通知", summary_lines=("这是一条本地手动测试通知。",), next_step="确认飞书个人账号是否收到消息", result_path="/", ) record = dispatch_workflow_notification(context) self.stdout.write(f"send_status={record.send_status}") self.stdout.write(f"target={record.target}") if record.error_message: self.stdout.write(f"error={record.error_message}")