From cbc7493df83fa2de22ab8b7a85de27268ce2523e Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 22:09:47 +0800 Subject: [PATCH] feat: wire feishu notifications into workflows --- .../services/notifier.py | 10 ++ review_agent/file_summary/workflow.py | 13 +++ .../notifications/workflow_adapters.py | 102 ++++++++++++++++++ review_agent/regulatory_review/workflow.py | 13 +++ ...test_application_form_fill_notification.py | 13 ++- tests/test_feishu_workflow_adapters.py | 96 +++++++++++++++++ tests/test_file_summary_workflow.py | 25 +++++ tests/test_regulatory_notification.py | 30 ++++++ 8 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 review_agent/notifications/workflow_adapters.py create mode 100644 tests/test_feishu_workflow_adapters.py diff --git a/review_agent/application_form_fill/services/notifier.py b/review_agent/application_form_fill/services/notifier.py index c3c2969..0b9c93d 100644 --- a/review_agent/application_form_fill/services/notifier.py +++ b/review_agent/application_form_fill/services/notifier.py @@ -7,6 +7,8 @@ from review_agent.models import ( ApplicationFormFillNotificationRecord, ExportedSummaryFile, ) +from review_agent.notifications.dispatcher import dispatch_workflow_notification +from review_agent.notifications.workflow_adapters import build_application_form_fill_context def notify_completion( @@ -33,6 +35,13 @@ def notify_completion( retry_count=1, error_message="mock notification failed", ) + unified_error = "" + try: + unified_record = dispatch_workflow_notification(build_application_form_fill_context(batch)) + if unified_record.send_status == unified_record.SendStatus.FAILED: + unified_error = unified_record.error_message + except Exception as exc: + unified_error = str(exc) return ApplicationFormFillNotificationRecord.objects.create( batch=batch, recipient=batch.user, @@ -41,5 +50,6 @@ def notify_completion( export_ids=export_ids, message_summary=message_summary, send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS, + error_message=unified_error, sent_at=timezone.now(), ) diff --git a/review_agent/file_summary/workflow.py b/review_agent/file_summary/workflow.py index fe5378f..5409b2c 100644 --- a/review_agent/file_summary/workflow.py +++ b/review_agent/file_summary/workflow.py @@ -17,6 +17,8 @@ from review_agent.models import ( Message, WorkflowNodeRun, ) +from review_agent.notifications.dispatcher import dispatch_workflow_notification +from review_agent.notifications.workflow_adapters import build_file_summary_context from .events import record_event from .services.archive import ARCHIVE_EXTENSIONS @@ -154,14 +156,25 @@ class WorkflowExecutor: self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "error_message", "finished_at"]) record_event(self.batch, "workflow_failed", {"message": str(exc)}) + self._dispatch_completion_notification() return self.batch.status = FileSummaryBatch.Status.SUCCESS self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + self._dispatch_completion_notification() logger.info("Workflow run completed", extra={"batch_id": self.batch.pk}) + def _dispatch_completion_notification(self) -> None: + try: + dispatch_workflow_notification(build_file_summary_context(self.batch)) + except Exception as exc: + logger.warning( + "File summary notification failed without blocking workflow", + extra={"batch_id": self.batch.pk, "error": str(exc)}, + ) + def _run_node(self, node: WorkflowNodeRun) -> None: logger.info( "Workflow node started", diff --git a/review_agent/notifications/workflow_adapters.py b/review_agent/notifications/workflow_adapters.py new file mode 100644 index 0000000..d95f910 --- /dev/null +++ b/review_agent/notifications/workflow_adapters.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from review_agent.application_form_fill.constants import WORKFLOW_TYPE as FORM_FILL_WORKFLOW_TYPE +from review_agent.models import ( + ApplicationFormFillBatch, + ExportedSummaryFile, + FileSummaryBatch, + RegulatoryIssue, + RegulatoryReviewBatch, +) + +from .context import NotificationContext + + +def build_file_summary_context(batch: FileSummaryBatch) -> NotificationContext: + status = batch.status + abnormal_count = int(batch.failed_files or 0) + int(batch.unsupported_files or 0) + int(batch.uncertain_files or 0) + return NotificationContext( + workflow_type="file_summary", + workflow_name="自动汇总", + workflow_batch_id=batch.pk, + workflow_batch_no=batch.batch_no, + workflow_status=status, + trigger_user_id=batch.user_id, + trigger_username=batch.user.get_username(), + title=f"自动汇总{_status_label(status)}", + summary_lines=( + f"文件总数 {batch.total_files} 个,成功 {batch.success_files} 个", + f"异常/不支持/不确定 {abnormal_count} 个,总页数 {batch.total_pages}", + _error_line(batch.error_message), + ), + next_step="查看文件目录、页数统计和导出结果", + result_path=f"/api/review-agent/file-summary/{batch.pk}/status/", + ) + + +def build_regulatory_review_context(batch: RegulatoryReviewBatch) -> NotificationContext: + summary = batch.risk_summary or _count_regulatory_issues(batch) + return NotificationContext( + workflow_type="regulatory_review", + workflow_name="法规核查", + workflow_batch_id=batch.pk, + workflow_batch_no=batch.batch_no, + workflow_status=batch.status, + trigger_user_id=batch.user_id, + trigger_username=batch.user.get_username(), + title=f"法规核查{_status_label(batch.status)}", + summary_lines=( + f"阻断项 {int(summary.get('blocking') or 0)} 个,高风险 {int(summary.get('high') or 0)} 个", + f"中风险 {int(summary.get('medium') or 0)} 个,低风险 {int(summary.get('low') or 0)} 个", + _error_line(batch.error_message), + ), + next_step="查看风险报告并处理整改项", + result_path=f"/api/review-agent/regulatory-review/{batch.pk}/status/", + ) + + +def build_application_form_fill_context(batch: ApplicationFormFillBatch) -> NotificationContext: + export_count = ExportedSummaryFile.objects.filter( + workflow_type=FORM_FILL_WORKFLOW_TYPE, + workflow_batch_id=batch.pk, + ).count() + return NotificationContext( + workflow_type=FORM_FILL_WORKFLOW_TYPE, + workflow_name="自动填表", + workflow_batch_id=batch.pk, + workflow_batch_no=batch.batch_no, + workflow_status=batch.status, + trigger_user_id=batch.user_id, + trigger_username=batch.user.get_username(), + title=f"自动填表{_status_label(batch.status)}", + summary_lines=( + f"模板 {', '.join(batch.selected_templates or []) or '未识别'}", + f"导出文件 {export_count} 个,冲突字段 {len(batch.conflict_summary or [])} 个", + _error_line(batch.error_message), + ), + next_step="下载生成文件并检查字段冲突", + result_path=f"/api/review-agent/application-form-fill/{batch.pk}/status/", + ) + + +def _count_regulatory_issues(batch: RegulatoryReviewBatch) -> dict[str, int]: + return { + severity: RegulatoryIssue.objects.filter(batch=batch, severity=severity).count() + for severity in ["blocking", "high", "medium", "low", "info"] + } + + +def _status_label(status: str) -> str: + labels = { + "success": "完成", + "partial_success": "部分完成", + "failed": "失败", + "cancelled": "已取消", + } + return labels.get(status, status) + + +def _error_line(error_message: str) -> str: + if not error_message: + return "" + return f"失败原因:{error_message[:160]}" diff --git a/review_agent/regulatory_review/workflow.py b/review_agent/regulatory_review/workflow.py index 3b4edbd..c9b492c 100644 --- a/review_agent/regulatory_review/workflow.py +++ b/review_agent/regulatory_review/workflow.py @@ -18,6 +18,8 @@ from review_agent.models import ( RegulatoryReviewBatch, WorkflowNodeRun, ) +from review_agent.notifications.dispatcher import dispatch_workflow_notification +from review_agent.notifications.workflow_adapters import build_regulatory_review_context from review_agent.regulatory_review.services.completeness_check import run_completeness_check from review_agent.regulatory_review.services.consistency_check import run_consistency_check from review_agent.regulatory_review.services.export import build_assistant_summary, export_review_results @@ -146,14 +148,25 @@ class RegulatoryWorkflowExecutor: self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "error_message", "finished_at"]) record_event(self.batch, "workflow_failed", {"message": str(exc)}) + self._dispatch_completion_notification() return self.batch.status = RegulatoryReviewBatch.Status.SUCCESS self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + self._dispatch_completion_notification() logger.info("法规核查工作流完成 batch_no=%s findings=%s", self.batch.batch_no, len(self.findings)) + def _dispatch_completion_notification(self) -> None: + try: + dispatch_workflow_notification(build_regulatory_review_context(self.batch)) + except Exception as exc: + logger.warning( + "Regulatory review notification failed without blocking workflow", + extra={"batch_id": self.batch.pk, "error": str(exc)}, + ) + def _nodes(self): return WorkflowNodeRun.objects.filter( workflow_type="regulatory_review", diff --git a/tests/test_application_form_fill_notification.py b/tests/test_application_form_fill_notification.py index 9905689..86fbeae 100644 --- a/tests/test_application_form_fill_notification.py +++ b/tests/test_application_form_fill_notification.py @@ -13,7 +13,7 @@ from review_agent.models import ( pytestmark = pytest.mark.django_db -def test_notify_completion_records_success(django_user_model): +def test_notify_completion_records_success(django_user_model, monkeypatch): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY") @@ -33,6 +33,16 @@ def test_notify_completion_records_success(django_user_model): file_name="filled.docx", storage_path="filled.docx", ) + calls = [] + fake_record = type( + "Record", + (), + {"send_status": "success", "SendStatus": type("SendStatus", (), {"FAILED": "failed"}), "error_message": ""}, + )() + monkeypatch.setattr( + "review_agent.application_form_fill.services.notifier.dispatch_workflow_notification", + lambda context: calls.append(context) or fake_record, + ) record = notify_completion(batch, [exported]) @@ -40,6 +50,7 @@ def test_notify_completion_records_success(django_user_model): assert record.export_ids == [exported.pk] assert record.template_codes == ["registration_certificate"] assert record.sent_at is not None + assert calls[0].workflow_type == "application_form_fill" def test_notify_completion_records_failure_without_raising(django_user_model): diff --git a/tests/test_feishu_workflow_adapters.py b/tests/test_feishu_workflow_adapters.py new file mode 100644 index 0000000..8d915d9 --- /dev/null +++ b/tests/test_feishu_workflow_adapters.py @@ -0,0 +1,96 @@ +import pytest + +from review_agent.models import ( + ApplicationFormFillBatch, + Conversation, + ExportedSummaryFile, + FileSummaryBatch, + RegulatoryIssue, + RegulatoryReviewBatch, +) +from review_agent.notifications.message_builder import absolute_result_url +from review_agent.notifications.workflow_adapters import ( + build_application_form_fill_context, + build_file_summary_context, + build_regulatory_review_context, +) + + +pytestmark = pytest.mark.django_db + + +def test_file_summary_adapter_builds_summary(settings, django_user_model): + settings.PUBLIC_BASE_URL = "http://example.test" + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + batch = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-001", + status=FileSummaryBatch.Status.SUCCESS, + total_files=3, + success_files=2, + failed_files=1, + total_pages=15, + ) + + context = build_file_summary_context(batch) + + assert context.workflow_type == "file_summary" + assert context.workflow_batch_no == "FS-001" + assert "异常" in "\n".join(context.summary_lines) + assert absolute_result_url(context.result_path).endswith(f"/api/review-agent/file-summary/{batch.pk}/status/") + + +def test_regulatory_review_adapter_builds_risk_summary(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary_batch = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-RR") + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary_batch, + batch_no="RR-001", + status=RegulatoryReviewBatch.Status.SUCCESS, + ) + RegulatoryIssue.objects.create( + batch=batch, + category=RegulatoryIssue.Category.COMPLETENESS, + severity=RegulatoryIssue.Severity.BLOCKING, + title="缺少资料", + ) + + context = build_regulatory_review_context(batch) + + assert context.workflow_type == "regulatory_review" + assert "阻断项 1" in "\n".join(context.summary_lines) + + +def test_application_form_fill_adapter_builds_export_and_conflict_summary(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary_batch = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-AFF") + batch = ApplicationFormFillBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary_batch, + batch_no="AFF-001", + status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS, + selected_templates=["registration_certificate"], + conflict_summary=[{"field": "product_name"}], + ) + ExportedSummaryFile.objects.create( + batch=summary_batch, + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + export_category="filled_template", + export_type=ExportedSummaryFile.ExportType.WORD, + file_name="filled.docx", + storage_path="filled.docx", + ) + + context = build_application_form_fill_context(batch) + + assert context.workflow_type == "application_form_fill" + assert "导出文件 1" in "\n".join(context.summary_lines) + assert "冲突字段 1" in "\n".join(context.summary_lines) diff --git a/tests/test_file_summary_workflow.py b/tests/test_file_summary_workflow.py index fbe855d..18feb42 100644 --- a/tests/test_file_summary_workflow.py +++ b/tests/test_file_summary_workflow.py @@ -71,6 +71,31 @@ def test_start_file_summary_workflow_runs_synchronously_for_tests(django_user_mo assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists() +def test_file_summary_workflow_dispatches_completion_notification(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="a.docx", + storage_path="x/a.docx", + file_size=1, + ) + batch = create_file_summary_batch(conversation=conversation, user=user) + calls = [] + + def fake_dispatch(context): + calls.append(context) + + monkeypatch.setattr("review_agent.file_summary.workflow.dispatch_workflow_notification", fake_dispatch) + + start_file_summary_workflow(batch, async_run=False) + + assert calls + assert calls[-1].workflow_type == "file_summary" + assert calls[-1].workflow_batch_id == batch.pk + + def test_workflow_extracts_archive_and_scans_extracted_files(settings, tmp_path, django_user_model): settings.MEDIA_ROOT = tmp_path user = django_user_model.objects.create_user(username="owner", password="pass") diff --git a/tests/test_regulatory_notification.py b/tests/test_regulatory_notification.py index e9c51f6..a800be6 100644 --- a/tests/test_regulatory_notification.py +++ b/tests/test_regulatory_notification.py @@ -9,6 +9,7 @@ from review_agent.models import ( ) from review_agent.regulatory_review.services.export import build_markdown_report, build_result_payload from review_agent.regulatory_review.services.feishu_notifier import create_mock_notifications +from review_agent.regulatory_review.workflow import RegulatoryWorkflowExecutor pytestmark = pytest.mark.django_db @@ -77,3 +78,32 @@ def test_notification_records_enter_reports(django_user_model): assert "通知记录" in build_markdown_report(batch) assert build_result_payload(batch)["notifications"][0]["channel"] == "mock" + + +def test_regulatory_completion_notification_uses_dispatcher(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-NOTIFY-DISPATCH", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-NOTIFY-DISPATCH", + status=RegulatoryReviewBatch.Status.SUCCESS, + ) + calls = [] + + monkeypatch.setattr( + "review_agent.regulatory_review.workflow.dispatch_workflow_notification", + lambda context: calls.append(context), + ) + + RegulatoryWorkflowExecutor(batch)._dispatch_completion_notification() + + assert calls + assert calls[0].workflow_type == "regulatory_review"