From bdc1d58c22cbc17180af5798fdf3d270e5797043 Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 22:05:20 +0800 Subject: [PATCH] feat: add feishu api notification services --- review_agent/notifications/__init__.py | 1 + review_agent/notifications/context.py | 22 ++ .../notifications/feishu_message_api.py | 87 ++++++++ review_agent/notifications/feishu_token.py | 83 ++++++++ review_agent/notifications/message_builder.py | 62 ++++++ review_agent/notifications/recipient.py | 55 +++++ tests/test_feishu_api_services.py | 200 ++++++++++++++++++ 7 files changed, 510 insertions(+) create mode 100644 review_agent/notifications/__init__.py create mode 100644 review_agent/notifications/context.py create mode 100644 review_agent/notifications/feishu_message_api.py create mode 100644 review_agent/notifications/feishu_token.py create mode 100644 review_agent/notifications/message_builder.py create mode 100644 review_agent/notifications/recipient.py create mode 100644 tests/test_feishu_api_services.py diff --git a/review_agent/notifications/__init__.py b/review_agent/notifications/__init__.py new file mode 100644 index 0000000..7e468bc --- /dev/null +++ b/review_agent/notifications/__init__.py @@ -0,0 +1 @@ +"""Unified workflow notification services.""" diff --git a/review_agent/notifications/context.py b/review_agent/notifications/context.py new file mode 100644 index 0000000..fe10432 --- /dev/null +++ b/review_agent/notifications/context.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class NotificationContext: + workflow_type: str + workflow_name: str + workflow_batch_id: int + workflow_batch_no: str + workflow_status: str + trigger_user_id: int + trigger_username: str + title: str + summary_lines: tuple[str, ...] + next_step: str + result_path: str + + @property + def dedupe_key(self) -> str: + return f"{self.workflow_type}:{self.workflow_batch_id}:{self.workflow_status}" diff --git a/review_agent/notifications/feishu_message_api.py b/review_agent/notifications/feishu_message_api.py new file mode 100644 index 0000000..bfa002c --- /dev/null +++ b/review_agent/notifications/feishu_message_api.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from dataclasses import dataclass +import time + +from django.conf import settings +import httpx + +from .feishu_token import get_tenant_access_token + + +@dataclass(frozen=True) +class FeishuMessageResult: + ok: bool + external_message_id: str = "" + error_code: str = "" + error_message: str = "" + request_duration_ms: int | None = None + refreshed_token: bool = False + + +def send_personal_message( + *, + tenant_access_token: str, + receive_id_type: str, + payload: dict, + retry_on_token_expired: bool = True, +) -> FeishuMessageResult: + start = time.monotonic() + try: + response = httpx.post( + getattr(settings, "FEISHU_MESSAGE_API_URL"), + params={"receive_id_type": receive_id_type}, + json=payload, + headers={"Authorization": f"Bearer {tenant_access_token}"}, + timeout=10, + ) + duration_ms = int((time.monotonic() - start) * 1000) + data = response.json() + except httpx.TimeoutException: + return FeishuMessageResult(ok=False, error_code="timeout", error_message="发送飞书消息超时") + except Exception as exc: + return FeishuMessageResult(ok=False, error_code="request_error", error_message=str(exc)) + + if response.status_code >= 400: + return FeishuMessageResult( + ok=False, + error_code=str(response.status_code), + error_message=response.text[:500], + request_duration_ms=duration_ms, + ) + + code = int(data.get("code") or 0) + if code == 0: + message_id = str((data.get("data") or {}).get("message_id") or "") + return FeishuMessageResult(ok=True, external_message_id=message_id, request_duration_ms=duration_ms) + + if retry_on_token_expired and code in {99991663, 99991664, 99991668, 99991669}: + token_result = get_tenant_access_token(force_refresh=True) + if token_result.ok: + retry_result = send_personal_message( + tenant_access_token=token_result.tenant_access_token, + receive_id_type=receive_id_type, + payload=payload, + retry_on_token_expired=False, + ) + return FeishuMessageResult( + ok=retry_result.ok, + external_message_id=retry_result.external_message_id, + error_code=retry_result.error_code, + error_message=retry_result.error_message, + request_duration_ms=retry_result.request_duration_ms, + refreshed_token=True, + ) + return FeishuMessageResult( + ok=False, + error_code=token_result.error_code, + error_message=token_result.error_message, + request_duration_ms=duration_ms, + ) + + return FeishuMessageResult( + ok=False, + error_code=str(code or "api_error"), + error_message=str(data.get("msg") or "飞书消息 API 失败"), + request_duration_ms=duration_ms, + ) diff --git a/review_agent/notifications/feishu_token.py b/review_agent/notifications/feishu_token.py new file mode 100644 index 0000000..97d4af4 --- /dev/null +++ b/review_agent/notifications/feishu_token.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from dataclasses import dataclass +import hashlib + +from django.conf import settings +from django.utils import timezone +import httpx + +from review_agent.models import FeishuAccessTokenCache + + +@dataclass(frozen=True) +class FeishuTokenResult: + ok: bool + tenant_access_token: str = "" + error_code: str = "" + error_message: str = "" + + +def app_id_hash(app_id: str) -> str: + return hashlib.sha256(app_id.encode("utf-8")).hexdigest() + + +def get_tenant_access_token(*, force_refresh: bool = False) -> FeishuTokenResult: + app_id = getattr(settings, "FEISHU_APP_ID", "") + app_secret = getattr(settings, "FEISHU_APP_SECRET", "") + if not app_id or not app_secret: + return FeishuTokenResult( + ok=False, + error_code="config_missing", + error_message="未配置 FEISHU_APP_ID 或 FEISHU_APP_SECRET", + ) + + hashed_app_id = app_id_hash(app_id) + now = timezone.now() + cache = FeishuAccessTokenCache.objects.filter(app_id_hash=hashed_app_id).first() + if cache and not force_refresh and cache.is_valid(now=now): + return FeishuTokenResult(ok=True, tenant_access_token=cache.tenant_access_token) + + try: + response = httpx.post( + getattr(settings, "FEISHU_TOKEN_API_URL"), + json={"app_id": app_id, "app_secret": app_secret}, + timeout=10, + ) + data = response.json() + except httpx.TimeoutException: + return _save_token_error(hashed_app_id, "timeout", "获取 tenant_access_token 超时") + except Exception as exc: + return _save_token_error(hashed_app_id, "request_error", str(exc)) + + if response.status_code >= 400: + return _save_token_error(hashed_app_id, str(response.status_code), response.text[:500]) + if int(data.get("code") or 0) != 0: + return _save_token_error(hashed_app_id, str(data.get("code") or "api_error"), str(data.get("msg") or "token API 失败")) + + token = str(data.get("tenant_access_token") or "") + expire_seconds = int(data.get("expire") or getattr(settings, "FEISHU_TENANT_TOKEN_CACHE_SECONDS", 6600)) + if not token: + return _save_token_error(hashed_app_id, "token_missing", "飞书未返回 tenant_access_token") + + FeishuAccessTokenCache.objects.update_or_create( + app_id_hash=hashed_app_id, + defaults={ + "tenant_access_token": token, + "expires_at": now + timezone.timedelta(seconds=max(expire_seconds - 60, 60)), + "error_message": "", + }, + ) + return FeishuTokenResult(ok=True, tenant_access_token=token) + + +def _save_token_error(app_id_hash_value: str, error_code: str, error_message: str) -> FeishuTokenResult: + FeishuAccessTokenCache.objects.update_or_create( + app_id_hash=app_id_hash_value, + defaults={ + "tenant_access_token": "", + "expires_at": None, + "error_message": error_message[:1000], + }, + ) + return FeishuTokenResult(ok=False, error_code=error_code, error_message=error_message[:1000]) diff --git a/review_agent/notifications/message_builder.py b/review_agent/notifications/message_builder.py new file mode 100644 index 0000000..9a2666f --- /dev/null +++ b/review_agent/notifications/message_builder.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import json + +from django.conf import settings + +from .context import NotificationContext +from .recipient import ResolvedFeishuTarget + + +def absolute_result_url(path: str) -> str: + base_url = getattr(settings, "PUBLIC_BASE_URL", "http://127.0.0.1:8000").rstrip("/") + if not path: + return base_url + if path.startswith("http://") or path.startswith("https://"): + return path + return f"{base_url}/{path.lstrip('/')}" + + +def build_message_summary(context: NotificationContext, target: ResolvedFeishuTarget) -> str: + lines = [ + context.title, + f"批次:{context.workflow_batch_no}", + f"状态:{context.workflow_status}", + f"发起人:{context.trigger_username}", + f"接收人:{target.display_name}", + *context.summary_lines, + f"下一步:{context.next_step}", + ] + return "\n".join(line for line in lines if line) + + +def build_feishu_post_message(context: NotificationContext, target: ResolvedFeishuTarget) -> dict: + result_url = absolute_result_url(context.result_path) + content = [ + [{"tag": "text", "text": f"{context.title}\n"}], + [{"tag": "text", "text": f"流程:{context.workflow_name}\n"}], + [{"tag": "text", "text": f"批次:{context.workflow_batch_no}\n"}], + [{"tag": "text", "text": f"状态:{context.workflow_status}\n"}], + [{"tag": "text", "text": f"发起人:{context.trigger_username}\n"}], + ] + for line in context.summary_lines: + content.append([{"tag": "text", "text": f"{line}\n"}]) + content.extend( + [ + [{"tag": "text", "text": f"下一步:{context.next_step}\n"}], + [{"tag": "a", "text": "查看系统结果", "href": result_url}], + ] + ) + return { + "receive_id": target.identifier_value, + "msg_type": "post", + "content": json.dumps( + { + "zh_cn": { + "title": context.title, + "content": content, + } + }, + ensure_ascii=False, + ), + } diff --git a/review_agent/notifications/recipient.py b/review_agent/notifications/recipient.py new file mode 100644 index 0000000..b75ce2e --- /dev/null +++ b/review_agent/notifications/recipient.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from django.conf import settings + + +@dataclass(frozen=True) +class ResolvedFeishuTarget: + ok: bool + identifier_type: str + identifier_value: str + display_name: str + masked_identifier: str + error_code: str = "" + error_message: str = "" + + +def mask_identifier(value: str) -> str: + if not value: + return "" + if len(value) <= 8: + return value[:2] + "***" + return f"{value[:4]}***{value[-4:]}" + + +def resolve_configured_personal_recipient() -> ResolvedFeishuTarget: + open_id = getattr(settings, "FEISHU_DEFAULT_USER_OPEN_ID", "") + user_id = getattr(settings, "FEISHU_DEFAULT_USER_ID", "") + display_name = getattr(settings, "FEISHU_DEFAULT_TARGET_NAME", "默认飞书接收人") + if open_id: + return ResolvedFeishuTarget( + ok=True, + identifier_type="open_id", + identifier_value=open_id, + display_name=display_name, + masked_identifier=mask_identifier(open_id), + ) + if user_id: + return ResolvedFeishuTarget( + ok=True, + identifier_type="user_id", + identifier_value=user_id, + display_name=display_name, + masked_identifier=mask_identifier(user_id), + ) + return ResolvedFeishuTarget( + ok=False, + identifier_type="missing", + identifier_value="", + display_name=display_name, + masked_identifier="", + error_code="recipient_missing", + error_message="未配置 FEISHU_DEFAULT_USER_OPEN_ID 或 FEISHU_DEFAULT_USER_ID", + ) diff --git a/tests/test_feishu_api_services.py b/tests/test_feishu_api_services.py new file mode 100644 index 0000000..b03ce8c --- /dev/null +++ b/tests/test_feishu_api_services.py @@ -0,0 +1,200 @@ +import json + +from django.utils import timezone +import pytest + +from review_agent.models import FeishuAccessTokenCache +from review_agent.notifications.context import NotificationContext +from review_agent.notifications.feishu_message_api import send_personal_message +from review_agent.notifications.feishu_token import app_id_hash, get_tenant_access_token +from review_agent.notifications.message_builder import build_feishu_post_message +from review_agent.notifications.recipient import resolve_configured_personal_recipient + + +pytestmark = pytest.mark.django_db + + +class FakeResponse: + def __init__(self, payload, status_code=200): + self.payload = payload + self.status_code = status_code + self.text = json.dumps(payload, ensure_ascii=False) + + def json(self): + return self.payload + + +def test_token_service_fetches_and_caches(monkeypatch, settings): + settings.FEISHU_APP_ID = "cli_a" + settings.FEISHU_APP_SECRET = "secret" + calls = [] + + def fake_post(*args, **kwargs): + calls.append(kwargs) + return FakeResponse({"code": 0, "tenant_access_token": "tenant-token", "expire": 7200}) + + monkeypatch.setattr("review_agent.notifications.feishu_token.httpx.post", fake_post) + + first = get_tenant_access_token() + second = get_tenant_access_token() + + assert first.ok + assert second.tenant_access_token == "tenant-token" + assert len(calls) == 1 + assert FeishuAccessTokenCache.objects.get(app_id_hash=app_id_hash("cli_a")).is_valid() + + +def test_token_service_refreshes_expired_cache(monkeypatch, settings): + settings.FEISHU_APP_ID = "cli_a" + settings.FEISHU_APP_SECRET = "secret" + FeishuAccessTokenCache.objects.create( + app_id_hash=app_id_hash("cli_a"), + tenant_access_token="old", + expires_at=timezone.now() - timezone.timedelta(minutes=1), + ) + + monkeypatch.setattr( + "review_agent.notifications.feishu_token.httpx.post", + lambda *args, **kwargs: FakeResponse({"code": 0, "tenant_access_token": "new", "expire": 7200}), + ) + + assert get_tenant_access_token().tenant_access_token == "new" + + +def test_token_service_returns_error_for_api_failure(monkeypatch, settings): + settings.FEISHU_APP_ID = "cli_a" + settings.FEISHU_APP_SECRET = "secret" + monkeypatch.setattr( + "review_agent.notifications.feishu_token.httpx.post", + lambda *args, **kwargs: FakeResponse({"code": 1, "msg": "bad secret"}), + ) + + result = get_tenant_access_token() + + assert not result.ok + assert result.error_message == "bad secret" + + +def test_recipient_prefers_open_id(settings): + settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" + settings.FEISHU_DEFAULT_USER_ID = "user_xxx" + settings.FEISHU_DEFAULT_TARGET_NAME = "负责人" + + target = resolve_configured_personal_recipient() + + assert target.ok + assert target.identifier_type == "open_id" + assert target.identifier_value == "ou_xxx" + + +def test_recipient_uses_user_id_when_open_id_missing(settings): + settings.FEISHU_DEFAULT_USER_OPEN_ID = "" + settings.FEISHU_DEFAULT_USER_ID = "user_xxx" + + target = resolve_configured_personal_recipient() + + assert target.ok + assert target.identifier_type == "user_id" + + +def test_recipient_missing(settings): + settings.FEISHU_DEFAULT_USER_OPEN_ID = "" + settings.FEISHU_DEFAULT_USER_ID = "" + + target = resolve_configured_personal_recipient() + + assert not target.ok + assert target.error_code == "recipient_missing" + + +def test_build_feishu_post_message_contains_summary(settings): + settings.PUBLIC_BASE_URL = "http://example.test" + settings.FEISHU_DEFAULT_USER_OPEN_ID = "ou_xxx" + target = resolve_configured_personal_recipient() + context = NotificationContext( + workflow_type="file_summary", + workflow_name="自动汇总", + workflow_batch_id=1, + workflow_batch_no="FS-001", + workflow_status="success", + trigger_user_id=1, + trigger_username="owner", + title="自动汇总完成", + summary_lines=("文件 3 个", "异常 0 个"), + next_step="查看汇总结果", + result_path="/summary/1/", + ) + + payload = build_feishu_post_message(context, target) + + assert payload["receive_id"] == "ou_xxx" + content = json.loads(payload["content"]) + assert content["zh_cn"]["title"] == "自动汇总完成" + assert "http://example.test/summary/1/" in payload["content"] + + +def test_send_personal_message_success(monkeypatch, settings): + settings.FEISHU_MESSAGE_API_URL = "http://feishu/messages" + requests = [] + + def fake_post(*args, **kwargs): + requests.append(kwargs) + return FakeResponse({"code": 0, "data": {"message_id": "om_xxx"}}) + + monkeypatch.setattr("review_agent.notifications.feishu_message_api.httpx.post", fake_post) + + result = send_personal_message( + tenant_access_token="token", + receive_id_type="open_id", + payload={"receive_id": "ou_xxx"}, + ) + + assert result.ok + assert result.external_message_id == "om_xxx" + assert requests[0]["headers"]["Authorization"] == "Bearer token" + + +def test_send_personal_message_api_error(monkeypatch, settings): + settings.FEISHU_MESSAGE_API_URL = "http://feishu/messages" + monkeypatch.setattr( + "review_agent.notifications.feishu_message_api.httpx.post", + lambda *args, **kwargs: FakeResponse({"code": 230001, "msg": "bad receive_id"}), + ) + + result = send_personal_message( + tenant_access_token="token", + receive_id_type="open_id", + payload={"receive_id": "bad"}, + ) + + assert not result.ok + assert result.error_code == "230001" + + +def test_send_personal_message_refreshes_token_once(monkeypatch, settings): + settings.FEISHU_MESSAGE_API_URL = "http://feishu/messages" + settings.FEISHU_APP_ID = "cli_a" + settings.FEISHU_APP_SECRET = "secret" + calls = {"message": 0} + + def fake_message_post(*args, **kwargs): + calls["message"] += 1 + if calls["message"] == 1: + return FakeResponse({"code": 99991663, "msg": "token expired"}) + return FakeResponse({"code": 0, "data": {"message_id": "om_retry"}}) + + monkeypatch.setattr("review_agent.notifications.feishu_message_api.httpx.post", fake_message_post) + monkeypatch.setattr( + "review_agent.notifications.feishu_token.httpx.post", + lambda *args, **kwargs: FakeResponse({"code": 0, "tenant_access_token": "fresh", "expire": 7200}), + ) + + result = send_personal_message( + tenant_access_token="stale", + receive_id_type="open_id", + payload={"receive_id": "ou_xxx"}, + ) + + assert result.ok + assert result.refreshed_token + assert calls["message"] == 2