Files
DEMO-AGENT/review_agent/regulatory_review/services/feishu_notifier.py

40 lines
1.4 KiB
Python

from __future__ import annotations
from django.utils import timezone
from review_agent.models import RegulatoryNotificationRecord, RegulatoryReviewBatch
NOTIFIABLE_SEVERITIES = {"blocking", "high", "medium"}
def create_mock_notifications(batch: RegulatoryReviewBatch) -> list[RegulatoryNotificationRecord]:
records = []
existing_issue_ids = {
item.get("issue_id")
for item in RegulatoryNotificationRecord.objects.filter(batch=batch, channel=RegulatoryNotificationRecord.Channel.MOCK).values_list(
"payload", flat=True
)
if isinstance(item, dict)
}
for issue in batch.issues.order_by("id"):
if issue.severity not in NOTIFIABLE_SEVERITIES or issue.pk in existing_issue_ids:
continue
records.append(
RegulatoryNotificationRecord.objects.create(
batch=batch,
channel=RegulatoryNotificationRecord.Channel.MOCK,
target="法规整改负责人",
status=RegulatoryNotificationRecord.Status.SENT,
sent_at=timezone.now(),
payload={
"issue_id": issue.pk,
"rule_code": issue.rule_code,
"severity": issue.severity,
"title": issue.title,
"suggestion": issue.suggestion,
},
)
)
return records