Files
DEMO-AGENT/docs/4.详细设计/4.飞书通知与问答接入.md

605 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 飞书通知与问答接入详细设计
## 文档信息
| 项目 | 内容 |
| --- | --- |
| 需求分析文档 | docs/1.需求分析/4.飞书通知与问答接入.md |
| 功能设计文档 | docs/2.功能设计/4.飞书通知与问答接入.md |
| 数据库设计文档 | docs/4.数据库设计/4.飞书通知与问答接入.md |
| 所属模块 | 审核智能体 review_agent |
| 设计日期 | 2026-06-07 |
| 设计版本 | V1.0 |
---
## 一、实现目标
首期实现一个统一飞书通知能力,使自动汇总、法规核查、自动填表三个工作流在完成、部分成功或失败时,通过飞书官方智能体/应用机器人消息 API 向指定个人账号发送富文本私聊通知。通知失败不阻断主流程,发送结果落库并在批次详情页展示。
同时预留飞书私聊问答所需的用户映射、查询服务、权限过滤和问答日志模型,但不实现飞书事件订阅回调。
---
## 二、推荐文件结构
| 文件 | 类型 | 责任 |
| --- | --- | --- |
| `review_agent/models.py` | 修改 | 新增 `FeishuUserMapping``WorkflowNotificationRecord``FeishuQuestionLog` |
| `review_agent/admin.py` | 修改/新增 | 注册飞书用户映射和通知记录后台 |
| `review_agent/notifications/__init__.py` | 新增 | 通知模块包 |
| `review_agent/notifications/context.py` | 新增 | 定义统一通知上下文 dataclass |
| `review_agent/notifications/recipient.py` | 新增 | 解析首期指定个人接收人;后续扩展为按系统用户映射解析 |
| `review_agent/notifications/message_builder.py` | 新增 | 构造飞书富文本 payload 和摘要 |
| `review_agent/notifications/feishu_token.py` | 新增 | 使用 App ID/App Secret 获取并缓存 tenant_access_token |
| `review_agent/notifications/feishu_message_api.py` | 新增 | 调用飞书发送消息 API、处理响应解析 |
| `review_agent/notifications/records.py` | 新增 | 判重和通知记录落库 |
| `review_agent/notifications/dispatcher.py` | 新增 | 对外统一发送入口 |
| `review_agent/notifications/workflow_adapters.py` | 新增 | 三个工作流批次到通知上下文的适配 |
| `review_agent/feishu_questions/query.py` | 新增 | 后续问答预留:批次摘要查询 |
| `review_agent/feishu_questions/permissions.py` | 新增 | 后续问答预留:权限过滤 |
| `tests/test_feishu_notification.py` | 新增 | 飞书通知单元测试 |
| `tests/test_feishu_question_reserved.py` | 新增 | 问答预留服务测试 |
---
## 三、数据结构设计
### 3.1 NotificationContext
```python
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class NotificationContext:
workflow_type: str
workflow_batch_id: int
workflow_batch_no: str
workflow_status: str
title: str
trigger_user_id: int
trigger_username: str
result_url: str
summary_lines: list[str] = field(default_factory=list)
next_action: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@property
def dedupe_key(self) -> str:
return f"{self.workflow_type}:{self.workflow_batch_id}:{self.workflow_status}"
```
### 3.2 ResolvedFeishuTarget
```python
from dataclasses import dataclass
@dataclass(frozen=True)
class ResolvedFeishuTarget:
mapping_id: int | None
display_name: str
identifier_type: str
identifier_value: str
masked_identifier: str
missing: bool = False
```
identifier_type 取值:
| 值 | 说明 |
| --- | --- |
| open_id | 使用飞书 open_id |
| user_id | 使用飞书 user_id |
| mobile | 使用手机号,后续按发起人私聊时使用 |
| missing | 未配置映射 |
---
## 四、模型详细设计
### 4.1 FeishuUserMapping
字段见数据库设计。模型需提供方法:
```python
def preferred_identifier(self) -> tuple[str, str]:
if self.feishu_open_id:
return "open_id", self.feishu_open_id
if self.feishu_user_id:
return "user_id", self.feishu_user_id
if self.feishu_mobile:
return "mobile", self.feishu_mobile
return "missing", ""
```
`clean()` 校验:
```python
def clean(self):
if not (self.feishu_open_id or self.feishu_user_id or self.feishu_mobile):
raise ValidationError("feishu_open_id、feishu_user_id、feishu_mobile 至少填写一个")
```
### 4.2 WorkflowNotificationRecord
字段见数据库设计。建议方法:
```python
@classmethod
def already_sent(cls, dedupe_key: str) -> bool:
return cls.objects.filter(dedupe_key=dedupe_key, send_status=cls.SendStatus.SUCCESS).exists()
```
注意:若使用唯一约束限制 `dedupe_key`,重复触发时可以直接返回已有记录;若希望保留 skipped_duplicate 记录,则不能对 dedupe_key 做全局唯一,只能用查询判重。本项目需求是“只发一次”,更推荐保留唯一成功意图,重复触发返回已有记录或创建 skipped 记录需在实现计划中二选一。为了 SQLite 简化,首期建议不创建 skipped 记录,直接返回已有成功记录。
---
## 五、核心服务详细设计
### 5.1 workflow_adapters.py
职责:把不同批次对象转换为 `NotificationContext`
函数:
```python
def build_file_summary_context(batch: FileSummaryBatch) -> NotificationContext: ...
def build_regulatory_review_context(batch: RegulatoryReviewBatch) -> NotificationContext: ...
def build_application_form_fill_context(batch: ApplicationFormFillBatch) -> NotificationContext: ...
```
自动汇总摘要:
| 字段 | 计算方式 |
| --- | --- |
| 文件总数 | `batch.items.count()` |
| 成功解析数 | 解析状态为 success 的 item 数 |
| 异常数 | failed、skipped、unsupported 等状态数量 |
| 导出文件数 | `ExportedSummaryFile` 中 workflow_type=file_summary 或 batch 关联文件数 |
法规核查摘要:
| 字段 | 计算方式 |
| --- | --- |
| 风险总数 | `batch.issues.count()` |
| 阻断项 | severity=blocking |
| 高风险 | severity=high |
| 中风险 | severity=medium |
自动填表摘要:
| 字段 | 计算方式 |
| --- | --- |
| 模板数 | `len(batch.selected_templates)` |
| 导出文件数 | 对应 `ExportedSummaryFile` 数量 |
| 冲突字段数 | `len(batch.conflict_summary or [])` |
| 失败原因 | `batch.error_message` 或节点错误摘要 |
### 5.2 recipient.py
职责:首期根据环境变量解析指定个人接收人;后续可扩展为根据系统用户解析飞书目标。
伪代码:
```python
def resolve_feishu_target(user: User) -> ResolvedFeishuTarget:
if settings.FEISHU_DEFAULT_USER_OPEN_ID:
return ResolvedFeishuTarget(
mapping_id=None,
display_name=getattr(settings, "FEISHU_DEFAULT_TARGET_NAME", "指定个人账号"),
identifier_type="open_id",
identifier_value=settings.FEISHU_DEFAULT_USER_OPEN_ID,
masked_identifier=mask_identifier(settings.FEISHU_DEFAULT_USER_OPEN_ID),
missing=False,
)
if settings.FEISHU_DEFAULT_USER_ID:
return ResolvedFeishuTarget(
mapping_id=None,
display_name=getattr(settings, "FEISHU_DEFAULT_TARGET_NAME", "指定个人账号"),
identifier_type="user_id",
identifier_value=settings.FEISHU_DEFAULT_USER_ID,
masked_identifier=mask_identifier(settings.FEISHU_DEFAULT_USER_ID),
missing=False,
)
return ResolvedFeishuTarget(
mapping_id=None,
display_name=user.get_username(),
identifier_type="missing",
identifier_value="",
masked_identifier="",
missing=True,
)
def resolve_feishu_target_by_user_mapping(user: User) -> ResolvedFeishuTarget:
mapping = (
FeishuUserMapping.objects
.filter(system_user=user, is_active=True)
.first()
)
if mapping is None:
return ResolvedFeishuTarget(
mapping_id=None,
display_name=user.get_username(),
identifier_type="missing",
identifier_value="",
masked_identifier="",
missing=True,
)
identifier_type, identifier_value = mapping.preferred_identifier()
return ResolvedFeishuTarget(
mapping_id=mapping.pk,
display_name=mapping.feishu_display_name or user.get_username(),
identifier_type=identifier_type,
identifier_value=identifier_value,
masked_identifier=mask_identifier(identifier_value),
missing=identifier_type == "missing",
)
```
脱敏规则:
| 类型 | 规则 |
| --- | --- |
| mobile | 保留前三位和后四位,如 `138****1234` |
| open_id/user_id | 保留前 6 位和后 4 位 |
| missing | 空字符串 |
首期调度器使用 `resolve_feishu_target()``resolve_feishu_target_by_user_mapping()` 作为后续“按发起人私聊”能力预留。
### 5.3 message_builder.py
职责:构造富文本 payload 和入库摘要。
函数:
```python
def build_feishu_post_message(
context: NotificationContext,
target: ResolvedFeishuTarget,
) -> dict: ...
def build_message_summary(
context: NotificationContext,
target: ResolvedFeishuTarget,
) -> str: ...
```
富文本规则:
| 场景 | 规则 |
| --- | --- |
| 有映射 | 加入 `at` 标签 |
| 无映射 | 不加入 `at` 标签,增加映射缺失提示 |
| 失败状态 | 标题和下一步动作突出失败原因摘要 |
| 摘要过长 | 每条摘要最多 120 字,总摘要最多 800 字 |
| 链接 | 使用本地地址拼接,后续再切换域名配置 |
### 5.4 feishu_token.py
职责:使用 App ID/App Secret 获取并缓存 `tenant_access_token`
函数:
```python
def get_tenant_access_token() -> FeishuTokenResult: ...
def refresh_tenant_access_token() -> FeishuTokenResult: ...
```
结果结构:
```python
@dataclass(frozen=True)
class FeishuTokenResult:
ok: bool
tenant_access_token: str
expire_seconds: int
code: str
message: str
```
处理规则:
| 场景 | 处理 |
| --- | --- |
| App ID/App Secret 缺失 | 返回 failed错误码 config_missing |
| 缓存 token 未过期 | 直接返回缓存 token |
| token 过期或不存在 | 调用飞书 token API 重新获取 |
| token API 返回失败 | 返回 failed记录 code/message |
| HTTP 超时 | 返回 failed错误码 timeout |
### 5.5 feishu_message_api.py
职责:调用飞书发送消息 API。
函数:
```python
def send_personal_message(
*,
tenant_access_token: str,
receive_id_type: str,
receive_id: str,
payload: dict,
) -> FeishuMessageApiResult: ...
```
结果结构:
```python
@dataclass(frozen=True)
class FeishuMessageApiResult:
ok: bool
status_code: int | None
code: str
message: str
duration_ms: int
message_id: str = ""
```
异常处理:
| 异常 | 处理 |
| --- | --- |
| 指定接收人缺失 | 返回 failed错误码 recipient_missing |
| tenant_access_token 缺失 | 返回 failed错误码 token_missing |
| HTTP 超时 | 返回 failed错误码 timeout |
| 非 2xx | 返回 failed记录 status_code |
| 飞书返回 code 非 0 | 返回 failed记录 code/message |
| token 失效 | 刷新 token 后允许同步重试一次消息 API |
### 5.6 records.py
职责:判重和落库。
流程:
```text
输入 NotificationContext
-> 查询 dedupe_key 是否已有 success
-> 若有,返回已有记录,不发送
-> 若未启用真实飞书,创建 disabled/mock 记录
-> 若发送成功,创建 success 记录
-> 若发送失败,创建 failed 记录
```
字段写入规则:
| 字段 | 来源 |
| --- | --- |
| workflow_type | context.workflow_type |
| workflow_batch_id | context.workflow_batch_id |
| workflow_batch_no | context.workflow_batch_no |
| workflow_status | context.workflow_status |
| dedupe_key | context.dedupe_key |
| trigger_user_id | context.trigger_user_id |
| feishu_mapping_id | target.mapping_id |
| at_identifier_type | target.identifier_type |
| at_identifier_masked | target.masked_identifier |
| message_summary | `build_message_summary()` |
### 5.7 dispatcher.py
对外入口:
```python
def dispatch_workflow_notification(context: NotificationContext) -> WorkflowNotificationRecord:
if WorkflowNotificationRecord.already_sent(context.dedupe_key):
return WorkflowNotificationRecord.objects.get(
dedupe_key=context.dedupe_key,
send_status=WorkflowNotificationRecord.SendStatus.SUCCESS,
)
user = User.objects.get(pk=context.trigger_user_id)
target = resolve_feishu_target(user)
message = build_feishu_post_message(context, target)
summary = build_message_summary(context, target)
if not settings.FEISHU_NOTIFY_ENABLED:
return create_disabled_record(context, target, summary)
token_result = get_tenant_access_token()
if not token_result.ok:
return create_failed_record(context, target, summary, token_result)
result = send_personal_message(
tenant_access_token=token_result.tenant_access_token,
receive_id_type=target.identifier_type,
receive_id=target.identifier_value,
payload=message,
)
if result.ok:
return create_success_record(context, target, summary, result)
return create_failed_record(context, target, summary, result)
```
---
## 六、工作流接入点
| 工作流 | 推荐接入位置 |
| --- | --- |
| 自动汇总 | 文件汇总批次状态写为 success/partial_success/failed 后 |
| 法规核查 | 报告导出和风险项保存后;替换或并行现有 `create_mock_notifications` |
| 自动填表 | `notify` 节点中替换或扩展现有 `notify_completion` |
接入原则:
| 原则 | 说明 |
| --- | --- |
| 通知异常捕获 | 工作流调用通知服务时捕获异常并记录 non_blocking_errors |
| 不回滚业务结果 | 通知失败不修改业务批次成功状态 |
| 单点适配 | 工作流只负责生成或传入批次,摘要由 adapter 负责 |
---
## 七、批次详情展示设计
### 7.1 后端上下文
为批次详情页提供:
```python
def get_notification_records(workflow_type: str, batch_id: int) -> QuerySet:
return WorkflowNotificationRecord.objects.filter(
workflow_type=workflow_type,
workflow_batch_id=batch_id,
).order_by("-created_at")
```
### 7.2 页面展示规则
| 状态 | 展示 |
| --- | --- |
| success | “飞书通知已发送”,展示 sent_at |
| failed | “飞书通知失败”,展示 error_message |
| disabled | “飞书通知未启用” |
| 无记录 | “暂无通知记录” |
三个工作流结果页可复用同一 partial 模板或上下文字段。
---
## 八、问答预留详细设计
### 8.1 批次摘要查询服务
预留函数:
```python
def query_batch_summary(
user: User,
*,
workflow_type: str | None = None,
batch_no: str | None = None,
latest: bool = False,
) -> dict:
...
```
权限规则:
| 用户 | 可查范围 |
| --- | --- |
| 管理员 | 全部批次 |
| 普通用户 | `batch.user == user` 的批次 |
| 未绑定用户 | 不可查 |
查询对象:
| 类型 | 说明 |
| --- | --- |
| 明确批次号 | 精确匹配 batch_no |
| 最近/最新 | 在有权限范围内按 created_at/finished_at 倒序取第一条 |
| 工作流类型 | file_summary、regulatory_review、application_form_fill |
### 8.2 问答日志服务
预留函数:
```python
def record_feishu_question_log(
*,
user: User | None,
mapping: FeishuUserMapping | None,
source_type: str,
question_text: str,
intent: str,
query_object: dict,
answer_summary: str,
permission_result: str,
status: str,
error_message: str = "",
) -> FeishuQuestionLog:
...
```
首期不需要接飞书事件,但测试可直接调用该服务,确认日志字段与权限规则可用。
---
## 九、测试设计
### 9.1 单元测试
| 测试文件 | 用例 |
| --- | --- |
| `tests/test_feishu_notification.py` | tenant_access_token 获取和缓存 |
| `tests/test_feishu_notification.py` | 指定个人接收人优先级 open_id > user_id |
| `tests/test_feishu_notification.py` | 指定接收人缺失时写 failed 记录 |
| `tests/test_feishu_notification.py` | 真实通知关闭时写 disabled/mock 记录 |
| `tests/test_feishu_notification.py` | 消息 API 成功写 success 记录 |
| `tests/test_feishu_notification.py` | token 获取失败写 failed 记录 |
| `tests/test_feishu_notification.py` | 消息 API 超时写 failed 记录 |
| `tests/test_feishu_notification.py` | 同一 dedupe_key 不重复发送 |
| `tests/test_feishu_question_reserved.py` | 管理员可查询全部批次摘要 |
| `tests/test_feishu_question_reserved.py` | 普通用户只能查询自己的批次 |
| `tests/test_feishu_question_reserved.py` | 问答日志不保存完整回答正文 |
### 9.2 集成测试
| 场景 | 验证 |
| --- | --- |
| 自动汇总完成 | 生成通知上下文并写记录 |
| 法规核查完成 | 风险摘要正确 |
| 自动填表完成 | 导出和冲突摘要正确 |
| 批次详情页 | 展示通知状态和失败原因 |
### 9.3 外部飞书测试
真实飞书 API 测试不进入默认 CI。建议提供手动命令或 Django management command
```text
python manage.py send_test_feishu_notification --username owner
```
该命令只在本地配置 `FEISHU_NOTIFY_ENABLED=true``FEISHU_APP_ID``FEISHU_APP_SECRET``FEISHU_DEFAULT_USER_OPEN_ID``FEISHU_DEFAULT_USER_ID` 后使用。
---
## 十、异常处理
| 异常 | 处理 |
| --- | --- |
| 指定接收人缺失 | 不发送真实消息,记录 recipient_missing |
| App ID/App Secret 未配置 | 写 failed 或 disabled 记录,不发送 |
| tenant_access_token 获取失败 | 写 failed记录 token API 错误 |
| 指定接收人 open_id/user_id 未配置 | 写 failed错误码 recipient_missing |
| HTTP 超时 | 写 failed错误码 timeout |
| 飞书返回错误 | 写 failed记录 code/message |
| 通知记录唯一冲突 | 查询已有记录并返回,不重复发送 |
| 批次链接生成失败 | 发送无链接摘要,记录 warning 到 message_summary |
---
## 十一、日志与安全
| 项 | 要求 |
| --- | --- |
| 日志脱敏 | 不打印 App Secret、tenant_access_token、完整手机号 |
| 入库脱敏 | 通知记录只保存脱敏接收人标识 |
| payload | 不保存完整富文本 payload |
| 错误信息 | 保存飞书错误摘要,避免保存敏感请求头 |
| 问答日志 | 保存问题、意图、对象和回答摘要,不保存完整回答 |
---
## 十二、实施顺序建议
| 顺序 | 内容 |
| --- | --- |
| 1 | 新增模型、迁移和 Admin |
| 2 | 实现用户映射解析和脱敏 |
| 3 | 实现飞书富文本构造 |
| 4 | 实现 tenant_access_token 获取与缓存 |
| 5 | 实现飞书消息 API 发送客户端 |
| 6 | 实现通知记录判重和落库 |
| 7 | 实现三个工作流 adapter |
| 8 | 接入三个工作流完成节点 |
| 9 | 批次详情页展示通知状态 |
| 10 | 实现问答预留查询服务和日志服务 |
| 11 | 补齐单元测试和集成测试 |