feat: add feishu api notification services

This commit is contained in:
2026-06-07 22:05:20 +08:00
parent da81ce24d0
commit bdc1d58c22
7 changed files with 510 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Unified workflow notification services."""

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class NotificationContext:
workflow_type: str
workflow_name: str
workflow_batch_id: int
workflow_batch_no: str
workflow_status: str
trigger_user_id: int
trigger_username: str
title: str
summary_lines: tuple[str, ...]
next_step: str
result_path: str
@property
def dedupe_key(self) -> str:
return f"{self.workflow_type}:{self.workflow_batch_id}:{self.workflow_status}"

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from dataclasses import dataclass
import time
from django.conf import settings
import httpx
from .feishu_token import get_tenant_access_token
@dataclass(frozen=True)
class FeishuMessageResult:
ok: bool
external_message_id: str = ""
error_code: str = ""
error_message: str = ""
request_duration_ms: int | None = None
refreshed_token: bool = False
def send_personal_message(
*,
tenant_access_token: str,
receive_id_type: str,
payload: dict,
retry_on_token_expired: bool = True,
) -> FeishuMessageResult:
start = time.monotonic()
try:
response = httpx.post(
getattr(settings, "FEISHU_MESSAGE_API_URL"),
params={"receive_id_type": receive_id_type},
json=payload,
headers={"Authorization": f"Bearer {tenant_access_token}"},
timeout=10,
)
duration_ms = int((time.monotonic() - start) * 1000)
data = response.json()
except httpx.TimeoutException:
return FeishuMessageResult(ok=False, error_code="timeout", error_message="发送飞书消息超时")
except Exception as exc:
return FeishuMessageResult(ok=False, error_code="request_error", error_message=str(exc))
if response.status_code >= 400:
return FeishuMessageResult(
ok=False,
error_code=str(response.status_code),
error_message=response.text[:500],
request_duration_ms=duration_ms,
)
code = int(data.get("code") or 0)
if code == 0:
message_id = str((data.get("data") or {}).get("message_id") or "")
return FeishuMessageResult(ok=True, external_message_id=message_id, request_duration_ms=duration_ms)
if retry_on_token_expired and code in {99991663, 99991664, 99991668, 99991669}:
token_result = get_tenant_access_token(force_refresh=True)
if token_result.ok:
retry_result = send_personal_message(
tenant_access_token=token_result.tenant_access_token,
receive_id_type=receive_id_type,
payload=payload,
retry_on_token_expired=False,
)
return FeishuMessageResult(
ok=retry_result.ok,
external_message_id=retry_result.external_message_id,
error_code=retry_result.error_code,
error_message=retry_result.error_message,
request_duration_ms=retry_result.request_duration_ms,
refreshed_token=True,
)
return FeishuMessageResult(
ok=False,
error_code=token_result.error_code,
error_message=token_result.error_message,
request_duration_ms=duration_ms,
)
return FeishuMessageResult(
ok=False,
error_code=str(code or "api_error"),
error_message=str(data.get("msg") or "飞书消息 API 失败"),
request_duration_ms=duration_ms,
)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from dataclasses import dataclass
import hashlib
from django.conf import settings
from django.utils import timezone
import httpx
from review_agent.models import FeishuAccessTokenCache
@dataclass(frozen=True)
class FeishuTokenResult:
ok: bool
tenant_access_token: str = ""
error_code: str = ""
error_message: str = ""
def app_id_hash(app_id: str) -> str:
return hashlib.sha256(app_id.encode("utf-8")).hexdigest()
def get_tenant_access_token(*, force_refresh: bool = False) -> FeishuTokenResult:
app_id = getattr(settings, "FEISHU_APP_ID", "")
app_secret = getattr(settings, "FEISHU_APP_SECRET", "")
if not app_id or not app_secret:
return FeishuTokenResult(
ok=False,
error_code="config_missing",
error_message="未配置 FEISHU_APP_ID 或 FEISHU_APP_SECRET",
)
hashed_app_id = app_id_hash(app_id)
now = timezone.now()
cache = FeishuAccessTokenCache.objects.filter(app_id_hash=hashed_app_id).first()
if cache and not force_refresh and cache.is_valid(now=now):
return FeishuTokenResult(ok=True, tenant_access_token=cache.tenant_access_token)
try:
response = httpx.post(
getattr(settings, "FEISHU_TOKEN_API_URL"),
json={"app_id": app_id, "app_secret": app_secret},
timeout=10,
)
data = response.json()
except httpx.TimeoutException:
return _save_token_error(hashed_app_id, "timeout", "获取 tenant_access_token 超时")
except Exception as exc:
return _save_token_error(hashed_app_id, "request_error", str(exc))
if response.status_code >= 400:
return _save_token_error(hashed_app_id, str(response.status_code), response.text[:500])
if int(data.get("code") or 0) != 0:
return _save_token_error(hashed_app_id, str(data.get("code") or "api_error"), str(data.get("msg") or "token API 失败"))
token = str(data.get("tenant_access_token") or "")
expire_seconds = int(data.get("expire") or getattr(settings, "FEISHU_TENANT_TOKEN_CACHE_SECONDS", 6600))
if not token:
return _save_token_error(hashed_app_id, "token_missing", "飞书未返回 tenant_access_token")
FeishuAccessTokenCache.objects.update_or_create(
app_id_hash=hashed_app_id,
defaults={
"tenant_access_token": token,
"expires_at": now + timezone.timedelta(seconds=max(expire_seconds - 60, 60)),
"error_message": "",
},
)
return FeishuTokenResult(ok=True, tenant_access_token=token)
def _save_token_error(app_id_hash_value: str, error_code: str, error_message: str) -> FeishuTokenResult:
FeishuAccessTokenCache.objects.update_or_create(
app_id_hash=app_id_hash_value,
defaults={
"tenant_access_token": "",
"expires_at": None,
"error_message": error_message[:1000],
},
)
return FeishuTokenResult(ok=False, error_code=error_code, error_message=error_message[:1000])

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import json
from django.conf import settings
from .context import NotificationContext
from .recipient import ResolvedFeishuTarget
def absolute_result_url(path: str) -> str:
base_url = getattr(settings, "PUBLIC_BASE_URL", "http://127.0.0.1:8000").rstrip("/")
if not path:
return base_url
if path.startswith("http://") or path.startswith("https://"):
return path
return f"{base_url}/{path.lstrip('/')}"
def build_message_summary(context: NotificationContext, target: ResolvedFeishuTarget) -> str:
lines = [
context.title,
f"批次:{context.workflow_batch_no}",
f"状态:{context.workflow_status}",
f"发起人:{context.trigger_username}",
f"接收人:{target.display_name}",
*context.summary_lines,
f"下一步:{context.next_step}",
]
return "\n".join(line for line in lines if line)
def build_feishu_post_message(context: NotificationContext, target: ResolvedFeishuTarget) -> dict:
result_url = absolute_result_url(context.result_path)
content = [
[{"tag": "text", "text": f"{context.title}\n"}],
[{"tag": "text", "text": f"流程:{context.workflow_name}\n"}],
[{"tag": "text", "text": f"批次:{context.workflow_batch_no}\n"}],
[{"tag": "text", "text": f"状态:{context.workflow_status}\n"}],
[{"tag": "text", "text": f"发起人:{context.trigger_username}\n"}],
]
for line in context.summary_lines:
content.append([{"tag": "text", "text": f"{line}\n"}])
content.extend(
[
[{"tag": "text", "text": f"下一步:{context.next_step}\n"}],
[{"tag": "a", "text": "查看系统结果", "href": result_url}],
]
)
return {
"receive_id": target.identifier_value,
"msg_type": "post",
"content": json.dumps(
{
"zh_cn": {
"title": context.title,
"content": content,
}
},
ensure_ascii=False,
),
}

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from dataclasses import dataclass
from django.conf import settings
@dataclass(frozen=True)
class ResolvedFeishuTarget:
ok: bool
identifier_type: str
identifier_value: str
display_name: str
masked_identifier: str
error_code: str = ""
error_message: str = ""
def mask_identifier(value: str) -> str:
if not value:
return ""
if len(value) <= 8:
return value[:2] + "***"
return f"{value[:4]}***{value[-4:]}"
def resolve_configured_personal_recipient() -> ResolvedFeishuTarget:
open_id = getattr(settings, "FEISHU_DEFAULT_USER_OPEN_ID", "")
user_id = getattr(settings, "FEISHU_DEFAULT_USER_ID", "")
display_name = getattr(settings, "FEISHU_DEFAULT_TARGET_NAME", "默认飞书接收人")
if open_id:
return ResolvedFeishuTarget(
ok=True,
identifier_type="open_id",
identifier_value=open_id,
display_name=display_name,
masked_identifier=mask_identifier(open_id),
)
if user_id:
return ResolvedFeishuTarget(
ok=True,
identifier_type="user_id",
identifier_value=user_id,
display_name=display_name,
masked_identifier=mask_identifier(user_id),
)
return ResolvedFeishuTarget(
ok=False,
identifier_type="missing",
identifier_value="",
display_name=display_name,
masked_identifier="",
error_code="recipient_missing",
error_message="未配置 FEISHU_DEFAULT_USER_OPEN_ID 或 FEISHU_DEFAULT_USER_ID",
)