feat: wire feishu notifications into workflows

This commit is contained in:
2026-06-07 22:09:47 +08:00
parent 820069f558
commit cbc7493df8
8 changed files with 301 additions and 1 deletions

View File

@@ -7,6 +7,8 @@ from review_agent.models import (
ApplicationFormFillNotificationRecord, ApplicationFormFillNotificationRecord,
ExportedSummaryFile, ExportedSummaryFile,
) )
from review_agent.notifications.dispatcher import dispatch_workflow_notification
from review_agent.notifications.workflow_adapters import build_application_form_fill_context
def notify_completion( def notify_completion(
@@ -33,6 +35,13 @@ def notify_completion(
retry_count=1, retry_count=1,
error_message="mock notification failed", error_message="mock notification failed",
) )
unified_error = ""
try:
unified_record = dispatch_workflow_notification(build_application_form_fill_context(batch))
if unified_record.send_status == unified_record.SendStatus.FAILED:
unified_error = unified_record.error_message
except Exception as exc:
unified_error = str(exc)
return ApplicationFormFillNotificationRecord.objects.create( return ApplicationFormFillNotificationRecord.objects.create(
batch=batch, batch=batch,
recipient=batch.user, recipient=batch.user,
@@ -41,5 +50,6 @@ def notify_completion(
export_ids=export_ids, export_ids=export_ids,
message_summary=message_summary, message_summary=message_summary,
send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS, send_status=ApplicationFormFillNotificationRecord.SendStatus.SUCCESS,
error_message=unified_error,
sent_at=timezone.now(), sent_at=timezone.now(),
) )

View File

@@ -17,6 +17,8 @@ from review_agent.models import (
Message, Message,
WorkflowNodeRun, WorkflowNodeRun,
) )
from review_agent.notifications.dispatcher import dispatch_workflow_notification
from review_agent.notifications.workflow_adapters import build_file_summary_context
from .events import record_event from .events import record_event
from .services.archive import ARCHIVE_EXTENSIONS from .services.archive import ARCHIVE_EXTENSIONS
@@ -154,14 +156,25 @@ class WorkflowExecutor:
self.batch.finished_at = timezone.now() self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "error_message", "finished_at"]) self.batch.save(update_fields=["status", "error_message", "finished_at"])
record_event(self.batch, "workflow_failed", {"message": str(exc)}) record_event(self.batch, "workflow_failed", {"message": str(exc)})
self._dispatch_completion_notification()
return return
self.batch.status = FileSummaryBatch.Status.SUCCESS self.batch.status = FileSummaryBatch.Status.SUCCESS
self.batch.finished_at = timezone.now() self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "finished_at"]) self.batch.save(update_fields=["status", "finished_at"])
record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk})
self._dispatch_completion_notification()
logger.info("Workflow run completed", extra={"batch_id": self.batch.pk}) logger.info("Workflow run completed", extra={"batch_id": self.batch.pk})
def _dispatch_completion_notification(self) -> None:
try:
dispatch_workflow_notification(build_file_summary_context(self.batch))
except Exception as exc:
logger.warning(
"File summary notification failed without blocking workflow",
extra={"batch_id": self.batch.pk, "error": str(exc)},
)
def _run_node(self, node: WorkflowNodeRun) -> None: def _run_node(self, node: WorkflowNodeRun) -> None:
logger.info( logger.info(
"Workflow node started", "Workflow node started",

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
from review_agent.application_form_fill.constants import WORKFLOW_TYPE as FORM_FILL_WORKFLOW_TYPE
from review_agent.models import (
ApplicationFormFillBatch,
ExportedSummaryFile,
FileSummaryBatch,
RegulatoryIssue,
RegulatoryReviewBatch,
)
from .context import NotificationContext
def build_file_summary_context(batch: FileSummaryBatch) -> NotificationContext:
status = batch.status
abnormal_count = int(batch.failed_files or 0) + int(batch.unsupported_files or 0) + int(batch.uncertain_files or 0)
return NotificationContext(
workflow_type="file_summary",
workflow_name="自动汇总",
workflow_batch_id=batch.pk,
workflow_batch_no=batch.batch_no,
workflow_status=status,
trigger_user_id=batch.user_id,
trigger_username=batch.user.get_username(),
title=f"自动汇总{_status_label(status)}",
summary_lines=(
f"文件总数 {batch.total_files} 个,成功 {batch.success_files}",
f"异常/不支持/不确定 {abnormal_count} 个,总页数 {batch.total_pages}",
_error_line(batch.error_message),
),
next_step="查看文件目录、页数统计和导出结果",
result_path=f"/api/review-agent/file-summary/{batch.pk}/status/",
)
def build_regulatory_review_context(batch: RegulatoryReviewBatch) -> NotificationContext:
summary = batch.risk_summary or _count_regulatory_issues(batch)
return NotificationContext(
workflow_type="regulatory_review",
workflow_name="法规核查",
workflow_batch_id=batch.pk,
workflow_batch_no=batch.batch_no,
workflow_status=batch.status,
trigger_user_id=batch.user_id,
trigger_username=batch.user.get_username(),
title=f"法规核查{_status_label(batch.status)}",
summary_lines=(
f"阻断项 {int(summary.get('blocking') or 0)} 个,高风险 {int(summary.get('high') or 0)}",
f"中风险 {int(summary.get('medium') or 0)} 个,低风险 {int(summary.get('low') or 0)}",
_error_line(batch.error_message),
),
next_step="查看风险报告并处理整改项",
result_path=f"/api/review-agent/regulatory-review/{batch.pk}/status/",
)
def build_application_form_fill_context(batch: ApplicationFormFillBatch) -> NotificationContext:
export_count = ExportedSummaryFile.objects.filter(
workflow_type=FORM_FILL_WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
).count()
return NotificationContext(
workflow_type=FORM_FILL_WORKFLOW_TYPE,
workflow_name="自动填表",
workflow_batch_id=batch.pk,
workflow_batch_no=batch.batch_no,
workflow_status=batch.status,
trigger_user_id=batch.user_id,
trigger_username=batch.user.get_username(),
title=f"自动填表{_status_label(batch.status)}",
summary_lines=(
f"模板 {', '.join(batch.selected_templates or []) or '未识别'}",
f"导出文件 {export_count} 个,冲突字段 {len(batch.conflict_summary or [])}",
_error_line(batch.error_message),
),
next_step="下载生成文件并检查字段冲突",
result_path=f"/api/review-agent/application-form-fill/{batch.pk}/status/",
)
def _count_regulatory_issues(batch: RegulatoryReviewBatch) -> dict[str, int]:
return {
severity: RegulatoryIssue.objects.filter(batch=batch, severity=severity).count()
for severity in ["blocking", "high", "medium", "low", "info"]
}
def _status_label(status: str) -> str:
labels = {
"success": "完成",
"partial_success": "部分完成",
"failed": "失败",
"cancelled": "已取消",
}
return labels.get(status, status)
def _error_line(error_message: str) -> str:
if not error_message:
return ""
return f"失败原因:{error_message[:160]}"

View File

@@ -18,6 +18,8 @@ from review_agent.models import (
RegulatoryReviewBatch, RegulatoryReviewBatch,
WorkflowNodeRun, WorkflowNodeRun,
) )
from review_agent.notifications.dispatcher import dispatch_workflow_notification
from review_agent.notifications.workflow_adapters import build_regulatory_review_context
from review_agent.regulatory_review.services.completeness_check import run_completeness_check from review_agent.regulatory_review.services.completeness_check import run_completeness_check
from review_agent.regulatory_review.services.consistency_check import run_consistency_check from review_agent.regulatory_review.services.consistency_check import run_consistency_check
from review_agent.regulatory_review.services.export import build_assistant_summary, export_review_results from review_agent.regulatory_review.services.export import build_assistant_summary, export_review_results
@@ -146,14 +148,25 @@ class RegulatoryWorkflowExecutor:
self.batch.finished_at = timezone.now() self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "error_message", "finished_at"]) self.batch.save(update_fields=["status", "error_message", "finished_at"])
record_event(self.batch, "workflow_failed", {"message": str(exc)}) record_event(self.batch, "workflow_failed", {"message": str(exc)})
self._dispatch_completion_notification()
return return
self.batch.status = RegulatoryReviewBatch.Status.SUCCESS self.batch.status = RegulatoryReviewBatch.Status.SUCCESS
self.batch.finished_at = timezone.now() self.batch.finished_at = timezone.now()
self.batch.save(update_fields=["status", "finished_at"]) self.batch.save(update_fields=["status", "finished_at"])
record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk})
self._dispatch_completion_notification()
logger.info("法规核查工作流完成 batch_no=%s findings=%s", self.batch.batch_no, len(self.findings)) logger.info("法规核查工作流完成 batch_no=%s findings=%s", self.batch.batch_no, len(self.findings))
def _dispatch_completion_notification(self) -> None:
try:
dispatch_workflow_notification(build_regulatory_review_context(self.batch))
except Exception as exc:
logger.warning(
"Regulatory review notification failed without blocking workflow",
extra={"batch_id": self.batch.pk, "error": str(exc)},
)
def _nodes(self): def _nodes(self):
return WorkflowNodeRun.objects.filter( return WorkflowNodeRun.objects.filter(
workflow_type="regulatory_review", workflow_type="regulatory_review",

View File

@@ -13,7 +13,7 @@ from review_agent.models import (
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
def test_notify_completion_records_success(django_user_model): def test_notify_completion_records_success(django_user_model, monkeypatch):
user = django_user_model.objects.create_user(username="owner", password="pass") user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话") conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY") summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-NOTIFY")
@@ -33,6 +33,16 @@ def test_notify_completion_records_success(django_user_model):
file_name="filled.docx", file_name="filled.docx",
storage_path="filled.docx", storage_path="filled.docx",
) )
calls = []
fake_record = type(
"Record",
(),
{"send_status": "success", "SendStatus": type("SendStatus", (), {"FAILED": "failed"}), "error_message": ""},
)()
monkeypatch.setattr(
"review_agent.application_form_fill.services.notifier.dispatch_workflow_notification",
lambda context: calls.append(context) or fake_record,
)
record = notify_completion(batch, [exported]) record = notify_completion(batch, [exported])
@@ -40,6 +50,7 @@ def test_notify_completion_records_success(django_user_model):
assert record.export_ids == [exported.pk] assert record.export_ids == [exported.pk]
assert record.template_codes == ["registration_certificate"] assert record.template_codes == ["registration_certificate"]
assert record.sent_at is not None assert record.sent_at is not None
assert calls[0].workflow_type == "application_form_fill"
def test_notify_completion_records_failure_without_raising(django_user_model): def test_notify_completion_records_failure_without_raising(django_user_model):

View File

@@ -0,0 +1,96 @@
import pytest
from review_agent.models import (
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileSummaryBatch,
RegulatoryIssue,
RegulatoryReviewBatch,
)
from review_agent.notifications.message_builder import absolute_result_url
from review_agent.notifications.workflow_adapters import (
build_application_form_fill_context,
build_file_summary_context,
build_regulatory_review_context,
)
pytestmark = pytest.mark.django_db
def test_file_summary_adapter_builds_summary(settings, django_user_model):
settings.PUBLIC_BASE_URL = "http://example.test"
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
batch = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-001",
status=FileSummaryBatch.Status.SUCCESS,
total_files=3,
success_files=2,
failed_files=1,
total_pages=15,
)
context = build_file_summary_context(batch)
assert context.workflow_type == "file_summary"
assert context.workflow_batch_no == "FS-001"
assert "异常" in "\n".join(context.summary_lines)
assert absolute_result_url(context.result_path).endswith(f"/api/review-agent/file-summary/{batch.pk}/status/")
def test_regulatory_review_adapter_builds_risk_summary(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary_batch = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-RR")
batch = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary_batch,
batch_no="RR-001",
status=RegulatoryReviewBatch.Status.SUCCESS,
)
RegulatoryIssue.objects.create(
batch=batch,
category=RegulatoryIssue.Category.COMPLETENESS,
severity=RegulatoryIssue.Severity.BLOCKING,
title="缺少资料",
)
context = build_regulatory_review_context(batch)
assert context.workflow_type == "regulatory_review"
assert "阻断项 1" in "\n".join(context.summary_lines)
def test_application_form_fill_adapter_builds_export_and_conflict_summary(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary_batch = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-AFF")
batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary_batch,
batch_no="AFF-001",
status=ApplicationFormFillBatch.Status.PARTIAL_SUCCESS,
selected_templates=["registration_certificate"],
conflict_summary=[{"field": "product_name"}],
)
ExportedSummaryFile.objects.create(
batch=summary_batch,
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
export_category="filled_template",
export_type=ExportedSummaryFile.ExportType.WORD,
file_name="filled.docx",
storage_path="filled.docx",
)
context = build_application_form_fill_context(batch)
assert context.workflow_type == "application_form_fill"
assert "导出文件 1" in "\n".join(context.summary_lines)
assert "冲突字段 1" in "\n".join(context.summary_lines)

View File

@@ -71,6 +71,31 @@ def test_start_file_summary_workflow_runs_synchronously_for_tests(django_user_mo
assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists() assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists()
def test_file_summary_workflow_dispatches_completion_notification(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileAttachment.objects.create(
conversation=conversation,
user=user,
original_name="a.docx",
storage_path="x/a.docx",
file_size=1,
)
batch = create_file_summary_batch(conversation=conversation, user=user)
calls = []
def fake_dispatch(context):
calls.append(context)
monkeypatch.setattr("review_agent.file_summary.workflow.dispatch_workflow_notification", fake_dispatch)
start_file_summary_workflow(batch, async_run=False)
assert calls
assert calls[-1].workflow_type == "file_summary"
assert calls[-1].workflow_batch_id == batch.pk
def test_workflow_extracts_archive_and_scans_extracted_files(settings, tmp_path, django_user_model): def test_workflow_extracts_archive_and_scans_extracted_files(settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path settings.MEDIA_ROOT = tmp_path
user = django_user_model.objects.create_user(username="owner", password="pass") user = django_user_model.objects.create_user(username="owner", password="pass")

View File

@@ -9,6 +9,7 @@ from review_agent.models import (
) )
from review_agent.regulatory_review.services.export import build_markdown_report, build_result_payload from review_agent.regulatory_review.services.export import build_markdown_report, build_result_payload
from review_agent.regulatory_review.services.feishu_notifier import create_mock_notifications from review_agent.regulatory_review.services.feishu_notifier import create_mock_notifications
from review_agent.regulatory_review.workflow import RegulatoryWorkflowExecutor
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@@ -77,3 +78,32 @@ def test_notification_records_enter_reports(django_user_model):
assert "通知记录" in build_markdown_report(batch) assert "通知记录" in build_markdown_report(batch)
assert build_result_payload(batch)["notifications"][0]["channel"] == "mock" assert build_result_payload(batch)["notifications"][0]["channel"] == "mock"
def test_regulatory_completion_notification_uses_dispatcher(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-NOTIFY-DISPATCH",
status=FileSummaryBatch.Status.SUCCESS,
)
batch = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="RR-NOTIFY-DISPATCH",
status=RegulatoryReviewBatch.Status.SUCCESS,
)
calls = []
monkeypatch.setattr(
"review_agent.regulatory_review.workflow.dispatch_workflow_notification",
lambda context: calls.append(context),
)
RegulatoryWorkflowExecutor(batch)._dispatch_completion_notification()
assert calls
assert calls[0].workflow_type == "regulatory_review"