diff --git a/review_agent/file_summary/events.py b/review_agent/file_summary/events.py new file mode 100644 index 0000000..3d9f80c --- /dev/null +++ b/review_agent/file_summary/events.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from review_agent.models import FileSummaryBatch, WorkflowEvent + + +def record_event(batch: FileSummaryBatch, event_type: str, payload: dict | None = None) -> WorkflowEvent: + return WorkflowEvent.objects.create(batch=batch, event_type=event_type, payload=payload or {}) + + +def serialize_event(event: WorkflowEvent) -> dict[str, object]: + return { + "id": event.pk, + "event_type": event.event_type, + "payload": event.payload, + "created_at": event.created_at.isoformat(), + } diff --git a/review_agent/file_summary/views.py b/review_agent/file_summary/views.py index 1b48924..fa4d169 100644 --- a/review_agent/file_summary/views.py +++ b/review_agent/file_summary/views.py @@ -3,6 +3,8 @@ from django.http import Http404, JsonResponse from django.views.decorators.http import require_http_methods from review_agent.models import Conversation, FileAttachment +from review_agent.models import FileSummaryBatch, WorkflowEvent +from .events import serialize_event from .storage import save_uploaded_attachment, serialize_attachment @@ -56,3 +58,50 @@ def attachment_detail(request, conversation_id: int, attachment_id: int): attachment.is_active = False attachment.save(update_fields=["upload_status", "is_active"]) return JsonResponse({"ok": True, "attachment": serialize_attachment(attachment)}) + + +@require_http_methods(["GET"]) +@login_required +def batch_status(request, batch_id: int): + batch = FileSummaryBatch.objects.filter(pk=batch_id, user=request.user).first() + if not batch: + raise Http404("批次不存在。") + return JsonResponse( + { + "batch": { + "id": batch.pk, + "batch_no": batch.batch_no, + "status": batch.status, + "product_name": batch.product_name, + "total_files": batch.total_files, + "success_files": batch.success_files, + "failed_files": batch.failed_files, + "total_pages": batch.total_pages, + }, + "nodes": [ + { + "node_code": node.node_code, + "node_name": node.node_name, + "status": node.status, + "progress": node.progress, + "message": node.message, + } + for node in batch.node_runs.order_by("id") + ], + } + ) + + +@require_http_methods(["GET"]) +@login_required +def batch_events(request, batch_id: int): + batch = FileSummaryBatch.objects.filter(pk=batch_id, user=request.user).first() + if not batch: + raise Http404("批次不存在。") + after = request.GET.get("after") or "0" + try: + after_id = int(after) + except ValueError: + after_id = 0 + events = WorkflowEvent.objects.filter(batch=batch, pk__gt=after_id).order_by("id") + return JsonResponse({"events": [serialize_event(event) for event in events]}) diff --git a/review_agent/file_summary/workflow.py b/review_agent/file_summary/workflow.py new file mode 100644 index 0000000..9316350 --- /dev/null +++ b/review_agent/file_summary/workflow.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from threading import Thread +from uuid import uuid4 + +from django.db import transaction +from django.utils import timezone + +from review_agent.models import ( + Conversation, + FileAttachment, + FileSummaryBatch, + FileSummaryBatchAttachment, + Message, + WorkflowNodeRun, +) + +from .events import record_event + + +NODE_DEFINITIONS = [ + ("upload", "附件固化"), + ("extract", "压缩包解压"), + ("inventory", "文件扫描"), + ("page_count", "页数统计"), + ("product_detect", "产品识别"), + ("report", "报告输出"), + ("complete", "完成"), +] + + +def build_batch_no() -> str: + return f"FS-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" + + +@transaction.atomic +def create_file_summary_batch( + *, + conversation: Conversation, + user, + trigger_message: Message | None = None, +) -> FileSummaryBatch: + active_attachments = list( + FileAttachment.objects.select_for_update() + .filter(conversation=conversation, is_active=True) + .exclude(upload_status=FileAttachment.UploadStatus.DELETED) + .order_by("original_name", "-created_at") + ) + if not active_attachments: + raise ValueError("当前对话没有可用附件。") + + batch = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger_message, + batch_no=build_batch_no(), + ) + + for attachment in active_attachments: + FileSummaryBatchAttachment.objects.create(batch=batch, attachment=attachment) + attachment.upload_status = FileAttachment.UploadStatus.BOUND + attachment.save(update_fields=["upload_status"]) + + for code, name in NODE_DEFINITIONS: + WorkflowNodeRun.objects.create(batch=batch, node_code=code, node_name=name) + + record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) + return batch + + +class WorkflowExecutor: + def __init__(self, batch: FileSummaryBatch): + self.batch = batch + + def run(self) -> None: + self.batch.status = FileSummaryBatch.Status.RUNNING + self.batch.started_at = timezone.now() + self.batch.save(update_fields=["status", "started_at"]) + record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) + + try: + for node in self.batch.node_runs.order_by("id"): + self._run_node(node) + except Exception as exc: + self.batch.status = FileSummaryBatch.Status.FAILED + self.batch.error_message = str(exc) + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "error_message", "finished_at"]) + record_event(self.batch, "workflow_failed", {"message": str(exc)}) + return + + self.batch.status = FileSummaryBatch.Status.SUCCESS + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "finished_at"]) + record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + + def _run_node(self, node: WorkflowNodeRun) -> None: + now = timezone.now() + node.status = WorkflowNodeRun.Status.RUNNING + node.progress = 10 + node.started_at = now + node.message = f"{node.node_name}处理中" + node.save(update_fields=["status", "progress", "started_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress}, + ) + + node.status = WorkflowNodeRun.Status.SUCCESS + node.progress = 100 + node.finished_at = timezone.now() + node.message = f"{node.node_name}完成" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress}, + ) + + +def start_file_summary_workflow(batch: FileSummaryBatch, *, async_run: bool = True) -> None: + executor = WorkflowExecutor(batch) + if not async_run: + executor.run() + return + Thread(target=executor.run, daemon=True).start() diff --git a/review_agent/file_summary/workflow_trigger.py b/review_agent/file_summary/workflow_trigger.py new file mode 100644 index 0000000..ff86c41 --- /dev/null +++ b/review_agent/file_summary/workflow_trigger.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from review_agent.models import Conversation, FileAttachment + + +TRIGGER_KEYWORDS = ("自动汇总", "文件目录", "页数", "目录与页数", "文件清单") + + +@dataclass(frozen=True) +class TriggerResult: + should_start: bool + workflow_type: str = "" + reason: str = "" + + +def evaluate_file_summary_trigger(conversation: Conversation, content: str) -> TriggerResult: + text = (content or "").strip() + if not any(keyword in text for keyword in TRIGGER_KEYWORDS): + return TriggerResult(should_start=False, reason="not_matched") + + has_attachment = FileAttachment.objects.filter( + conversation=conversation, + is_active=True, + ).exclude(upload_status=FileAttachment.UploadStatus.DELETED).exists() + if not has_attachment: + return TriggerResult(should_start=False, reason="missing_attachment") + + return TriggerResult(should_start=True, workflow_type="file_summary") diff --git a/review_agent/services.py b/review_agent/services.py index 43a3a2f..c4b352b 100644 --- a/review_agent/services.py +++ b/review_agent/services.py @@ -3,8 +3,11 @@ from __future__ import annotations import json from django.db.models import Q, QuerySet +from django.conf import settings from django.utils import timezone +from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow +from .file_summary.workflow_trigger import evaluate_file_summary_trigger from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .models import Conversation, Message @@ -88,6 +91,7 @@ def stream_message(conversation: Conversation, content: str): user_message = append_user_message(conversation, content) assistant_parts: list[str] = [] + trigger = evaluate_file_summary_trigger(conversation, content) yield sse_event( "meta", @@ -99,6 +103,51 @@ def stream_message(conversation: Conversation, content: str): }, ) + if trigger.reason == "missing_attachment": + reply_content = "请先在当前对话右侧上传需要汇总的文件或压缩包,然后再发送自动汇总指令。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + + if trigger.should_start: + batch = create_file_summary_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + ) + start_file_summary_workflow( + batch, + async_run=getattr(settings, "FILE_SUMMARY_ASYNC", True), + ) + reply_content = f"已启动文件目录与页数自动汇总工作流,批次号:{batch.batch_no}。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event( + "workflow_started", + { + "workflow_type": "file_summary", + "batch_id": batch.pk, + "batch_no": batch.batch_no, + }, + ) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + try: for chunk in stream_reply(conversation, content): assistant_parts.append(chunk) diff --git a/review_agent/urls.py b/review_agent/urls.py index 272291d..5f6fac3 100644 --- a/review_agent/urls.py +++ b/review_agent/urls.py @@ -1,6 +1,6 @@ from django.urls import path -from .file_summary.views import attachment_detail, attachments +from .file_summary.views import attachment_detail, attachments, batch_events, batch_status urlpatterns = [ @@ -19,4 +19,14 @@ urlpatterns = [ attachment_detail, name="file_summary_attachment_detail", ), + path( + "api/review-agent/file-summary//status/", + batch_status, + name="file_summary_batch_status", + ), + path( + "api/review-agent/file-summary//events/", + batch_events, + name="file_summary_batch_events", + ), ] diff --git a/tests/test_file_summary_trigger.py b/tests/test_file_summary_trigger.py new file mode 100644 index 0000000..4d94164 --- /dev/null +++ b/tests/test_file_summary_trigger.py @@ -0,0 +1,32 @@ +import pytest + +from review_agent.file_summary.workflow_trigger import evaluate_file_summary_trigger +from review_agent.models import Conversation, FileAttachment + + +pytestmark = pytest.mark.django_db + + +def test_trigger_matches_keywords_only_when_active_attachment_exists(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + + no_file = evaluate_file_summary_trigger(conversation, "请自动汇总文件目录与页数") + assert no_file.should_start is False + assert no_file.reason == "missing_attachment" + + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="a.docx", + storage_path="x/a.docx", + file_size=1, + ) + + matched = evaluate_file_summary_trigger(conversation, "请自动汇总文件目录与页数") + assert matched.should_start is True + assert matched.workflow_type == "file_summary" + + normal = evaluate_file_summary_trigger(conversation, "你好,帮我解释法规") + assert normal.should_start is False + assert normal.reason == "not_matched" diff --git a/tests/test_file_summary_workflow.py b/tests/test_file_summary_workflow.py new file mode 100644 index 0000000..ea50817 --- /dev/null +++ b/tests/test_file_summary_workflow.py @@ -0,0 +1,102 @@ +import pytest + +from review_agent.file_summary.workflow import create_file_summary_batch, start_file_summary_workflow +from review_agent.models import ( + Conversation, + FileAttachment, + FileSummaryBatch, + FileSummaryBatchAttachment, + Message, + WorkflowEvent, + WorkflowNodeRun, +) +from review_agent.services import stream_message + + +pytestmark = pytest.mark.django_db + + +def test_create_batch_binds_active_attachments_and_initializes_nodes(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总") + active = FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="a.docx", + storage_path="x/a.docx", + file_size=1, + ) + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="old.docx", + is_active=False, + storage_path="x/old.docx", + file_size=1, + ) + + batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message) + + assert batch.status == FileSummaryBatch.Status.PENDING + assert FileSummaryBatchAttachment.objects.get(batch=batch).attachment == active + active.refresh_from_db() + assert active.upload_status == FileAttachment.UploadStatus.BOUND + assert WorkflowNodeRun.objects.filter(batch=batch).count() >= 6 + assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_created").exists() + + +def test_start_file_summary_workflow_runs_synchronously_for_tests(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总") + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="a.docx", + storage_path="x/a.docx", + file_size=1, + ) + batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message) + + start_file_summary_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == FileSummaryBatch.Status.SUCCESS + assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists() + + +def test_stream_message_returns_workflow_meta_when_triggered(settings, django_user_model): + settings.FILE_SUMMARY_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileAttachment.objects.create( + conversation=conversation, + user=user, + original_name="a.docx", + storage_path="x/a.docx", + file_size=1, + ) + + frames = list(stream_message(conversation, "请自动汇总文件目录与页数")) + + joined = "".join(frames) + assert "workflow_started" in joined + assert "\"workflow_type\": \"file_summary\"" in joined + assert FileSummaryBatch.objects.filter(conversation=conversation).exists() + + +def test_stream_message_uses_normal_llm_path_when_not_triggered(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + + def fake_stream_reply(conversation, content): + yield "普通回复" + + monkeypatch.setattr("review_agent.services.stream_reply", fake_stream_reply) + + frames = list(stream_message(conversation, "你好")) + + joined = "".join(frames) + assert "普通回复" in joined + assert "workflow_started" not in joined