Files

43 lines
1.8 KiB
Python

from __future__ import annotations
from review_agent.models import WorkflowNotificationRecord
def get_notification_records(workflow_type: str, batch_id: int):
return WorkflowNotificationRecord.objects.filter(
workflow_type=workflow_type,
workflow_batch_id=batch_id,
).order_by("-created_at", "-id")
def serialize_notification_record(record: WorkflowNotificationRecord) -> dict[str, object]:
return {
"id": record.pk,
"channel": record.channel,
"target": record.target,
"receiver": record.at_display_name or record.target,
"identifier_type": record.at_identifier_type,
"identifier_masked": record.at_identifier_masked,
"send_status": record.send_status,
"status_label": notification_status_label(record),
"sent_at": record.sent_at.isoformat() if record.sent_at else "",
"created_at": record.created_at.isoformat(),
"error_code": record.error_code,
"error_message": record.error_message,
}
def serialize_notification_records(workflow_type: str, batch_id: int) -> list[dict[str, object]]:
return [serialize_notification_record(record) for record in get_notification_records(workflow_type, batch_id)]
def notification_status_label(record: WorkflowNotificationRecord) -> str:
labels = {
WorkflowNotificationRecord.SendStatus.SUCCESS: "飞书通知已发送",
WorkflowNotificationRecord.SendStatus.FAILED: "飞书通知失败",
WorkflowNotificationRecord.SendStatus.DISABLED: "飞书通知未启用",
WorkflowNotificationRecord.SendStatus.SKIPPED_DUPLICATE: "飞书通知已跳过重复发送",
WorkflowNotificationRecord.SendStatus.PENDING: "飞书通知待发送",
}
return labels.get(record.send_status, record.send_status)