From 44d31d2a14cd82a709dcaa1800fd2c83622e7b75 Mon Sep 17 00:00:00 2001 From: bruce Date: Sun, 7 Jun 2026 00:34:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(regulatory):=20=E6=8E=A5=E5=85=A5=E6=B3=95?= =?UTF-8?q?=E8=A7=84=E6=A0=B8=E6=9F=A5=E8=A7=A6=E5=8F=91=E4=B8=8E=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E9=AA=A8=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/settings.py | 1 + review_agent/regulatory_review/events.py | 26 ++++ review_agent/regulatory_review/views.py | 42 ++++++ review_agent/regulatory_review/workflow.py | 151 ++++++++++++++++++++ review_agent/services.py | 51 +++++++ review_agent/skill_router.py | 34 ++++- review_agent/urls.py | 6 + tests/test_regulatory_views.py | 45 ++++++ tests/test_regulatory_workflow.py | 157 +++++++++++++++++++++ 9 files changed, 511 insertions(+), 2 deletions(-) create mode 100644 review_agent/regulatory_review/events.py create mode 100644 review_agent/regulatory_review/views.py create mode 100644 review_agent/regulatory_review/workflow.py create mode 100644 tests/test_regulatory_views.py create mode 100644 tests/test_regulatory_workflow.py diff --git a/config/settings.py b/config/settings.py index b8dfc9d..ad63757 100644 --- a/config/settings.py +++ b/config/settings.py @@ -114,6 +114,7 @@ REGULATORY_RAG_COLLECTION = os.environ.get( "REGULATORY_RAG_COLLECTION", "nmpa_ivd_registration_v1", ) +REGULATORY_REVIEW_ASYNC = os.environ.get("REGULATORY_REVIEW_ASYNC", "true").lower() == "true" SILICONFLOW_BASE_URL = os.environ.get("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1") SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "") SILICONFLOW_EMBEDDING_MODEL = os.environ.get( diff --git a/review_agent/regulatory_review/events.py b/review_agent/regulatory_review/events.py new file mode 100644 index 0000000..a752d36 --- /dev/null +++ b/review_agent/regulatory_review/events.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from review_agent.models import RegulatoryReviewBatch, WorkflowEvent + + +def record_event( + batch: RegulatoryReviewBatch, + event_type: str, + payload: dict | None = None, +) -> WorkflowEvent: + return WorkflowEvent.objects.create( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + conversation=batch.conversation, + event_type=event_type, + payload=payload or {}, + ) + + +def serialize_event(event: WorkflowEvent) -> dict[str, object]: + return { + "id": event.pk, + "event_type": event.event_type, + "payload": event.payload, + "created_at": event.created_at.isoformat(), + } diff --git a/review_agent/regulatory_review/views.py b/review_agent/regulatory_review/views.py new file mode 100644 index 0000000..1842487 --- /dev/null +++ b/review_agent/regulatory_review/views.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from django.http import Http404, JsonResponse +from django.views.decorators.http import require_http_methods +from django.contrib.auth.decorators import login_required + +from review_agent.models import RegulatoryReviewBatch, WorkflowNodeRun + + +@require_http_methods(["GET"]) +@login_required +def batch_status(request, batch_id: int): + batch = RegulatoryReviewBatch.objects.filter(pk=batch_id, user=request.user).first() + if not batch: + raise Http404("批次不存在。") + nodes = WorkflowNodeRun.objects.filter( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + ).order_by("id") + return JsonResponse( + { + "batch": { + "id": batch.pk, + "workflow_type": "regulatory_review", + "batch_no": batch.batch_no, + "status": batch.status, + "source_summary_batch_id": batch.source_summary_batch_id, + "risk_summary": batch.risk_summary, + "error_message": batch.error_message, + }, + "nodes": [ + { + "node_code": node.node_code, + "node_name": node.node_name, + "status": node.status, + "progress": node.progress, + "message": node.message, + } + for node in nodes + ], + } + ) diff --git a/review_agent/regulatory_review/workflow.py b/review_agent/regulatory_review/workflow.py new file mode 100644 index 0000000..fc0b2e6 --- /dev/null +++ b/review_agent/regulatory_review/workflow.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from threading import Thread +from uuid import uuid4 + +from django.conf import settings +from django.db import transaction +from django.utils import timezone + +from review_agent.models import ( + Conversation, + FileSummaryBatch, + Message, + RegulatoryReviewBatch, + WorkflowNodeRun, +) + +from .events import record_event + + +NODE_DEFINITIONS = [ + ("prepare", "准备", "prepare"), + ("rule_scope", "规则范围", "rule_scope"), + ("completeness_check", "完整性核查", "completeness_check"), + ("text_extract", "文本抽取", "text_extract"), + ("structure_check", "章节核查", "structure_check"), + ("consistency_check", "一致性核查", "consistency_check"), + ("risk_assess", "风险评估", "risk_assess"), + ("report_export", "报告输出", "report_export"), + ("completed", "完成", "completed"), +] + + +logger = logging.getLogger("review_agent.regulatory_review.workflow") + + +def build_batch_no() -> str: + return f"RR-{timezone.localtime().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}" + + +def build_batch_work_dir(batch_no: str) -> Path: + return Path(settings.MEDIA_ROOT) / "regulatory_review" / "work" / batch_no + + +def find_latest_successful_summary_batch(conversation: Conversation) -> FileSummaryBatch | None: + return ( + FileSummaryBatch.objects.filter( + conversation=conversation, + status=FileSummaryBatch.Status.SUCCESS, + ) + .order_by("-finished_at", "-created_at", "-id") + .first() + ) + + +@transaction.atomic +def create_regulatory_review_batch( + *, + conversation: Conversation, + user, + source_summary_batch: FileSummaryBatch, + trigger_message: Message | None = None, +) -> RegulatoryReviewBatch: + batch_no = build_batch_no() + work_dir = build_batch_work_dir(batch_no) + work_dir.mkdir(parents=True, exist_ok=True) + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + trigger_message=trigger_message, + source_summary_batch=source_summary_batch, + batch_no=batch_no, + work_dir=str(work_dir), + ) + for code, name, group in NODE_DEFINITIONS: + WorkflowNodeRun.objects.create( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + node_group=group, + node_code=code, + node_name=name, + ) + record_event(batch, "workflow_created", {"batch_id": batch.pk, "batch_no": batch.batch_no}) + return batch + + +class RegulatoryWorkflowExecutor: + def __init__(self, batch: RegulatoryReviewBatch): + self.batch = batch + + def run(self) -> None: + self.batch.status = RegulatoryReviewBatch.Status.RUNNING + self.batch.started_at = timezone.now() + self.batch.save(update_fields=["status", "started_at"]) + record_event(self.batch, "workflow_started", {"batch_id": self.batch.pk}) + + try: + for node in self._nodes(): + self._run_node(node) + except Exception as exc: + logger.exception("Regulatory workflow failed", extra={"batch_id": self.batch.pk}) + self.batch.status = RegulatoryReviewBatch.Status.FAILED + self.batch.error_message = str(exc) + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "error_message", "finished_at"]) + record_event(self.batch, "workflow_failed", {"message": str(exc)}) + return + + self.batch.status = RegulatoryReviewBatch.Status.SUCCESS + self.batch.finished_at = timezone.now() + self.batch.save(update_fields=["status", "finished_at"]) + record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + + def _nodes(self): + return WorkflowNodeRun.objects.filter( + workflow_type="regulatory_review", + workflow_batch_id=self.batch.pk, + ).order_by("id") + + def _run_node(self, node: WorkflowNodeRun) -> None: + node.status = WorkflowNodeRun.Status.RUNNING + node.progress = 10 + node.started_at = timezone.now() + node.message = f"{node.node_name}处理中" + node.save(update_fields=["status", "progress", "started_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + + node.status = WorkflowNodeRun.Status.SUCCESS + node.progress = 100 + node.finished_at = timezone.now() + node.message = f"{node.node_name}完成" + node.save(update_fields=["status", "progress", "finished_at", "message"]) + record_event( + self.batch, + "node_progress", + {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, + ) + + +def start_regulatory_review_workflow(batch: RegulatoryReviewBatch, *, async_run: bool = True) -> None: + executor = RegulatoryWorkflowExecutor(batch) + if not async_run: + executor.run() + return + Thread(target=executor.run, daemon=True).start() diff --git a/review_agent/services.py b/review_agent/services.py index 376d3c5..9ac3729 100644 --- a/review_agent/services.py +++ b/review_agent/services.py @@ -11,6 +11,11 @@ from .file_summary.skills.attachment_reader import AttachmentReaderSkill from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .models import Conversation, FileAttachment, Message +from .regulatory_review.workflow import ( + create_regulatory_review_batch, + find_latest_successful_summary_batch, + start_regulatory_review_workflow, +) from .skill_router import route_message_intent @@ -219,6 +224,52 @@ def stream_message(conversation: Conversation, content: str): ) return + if route.starts_regulatory_review: + source_summary_batch = find_latest_successful_summary_batch(conversation) + if not source_summary_batch: + reply_content = "请先执行自动汇总,生成成功的文件汇总批次后再启动法规核查。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + batch = create_regulatory_review_batch( + conversation=conversation, + user=conversation.user, + trigger_message=user_message, + source_summary_batch=source_summary_batch, + ) + start_regulatory_review_workflow( + batch, + async_run=getattr(settings, "REGULATORY_REVIEW_ASYNC", True), + ) + reply_content = f"已启动 NMPA 注册资料法规核查工作流,批次号:{batch.batch_no}。" + assistant_message = append_assistant_message(conversation, reply_content) + yield sse_event( + "workflow_started", + { + "workflow_type": "regulatory_review", + "batch_id": batch.pk, + "batch_no": batch.batch_no, + }, + ) + yield sse_event("chunk", {"delta": reply_content}) + yield sse_event( + "done", + { + "assistant_message_id": assistant_message.pk, + "conversation_id": conversation.pk, + "title": conversation.title, + }, + ) + return + stream_failed = False stream_error = "" try: diff --git a/review_agent/skill_router.py b/review_agent/skill_router.py index d81ebbc..05718e4 100644 --- a/review_agent/skill_router.py +++ b/review_agent/skill_router.py @@ -15,6 +15,7 @@ from .models import Conversation, FileAttachment logger = logging.getLogger(__name__) ROUTE_ACTIONS = {"normal_chat", "attachment_reader", "file_summary"} +ROUTE_ACTIONS.add("regulatory_review") @dataclass(frozen=True) @@ -34,6 +35,10 @@ class SkillRoute: def starts_file_summary(self) -> bool: return self.action == "file_summary" + @property + def starts_regulatory_review(self) -> bool: + return self.action == "regulatory_review" + @property def is_normal_chat(self) -> bool: return self.action == "normal_chat" @@ -100,7 +105,7 @@ def _route_with_llm( return SkillRoute( action=action, skill_name="attachment_reader" if action == "attachment_reader" else "", - workflow_type="file_summary" if action == "file_summary" else "", + workflow_type=action if action in {"file_summary", "regulatory_review"} else "", confidence=_float_or_zero(payload.get("confidence")), reason=str(payload.get("reason") or ""), source="llm", @@ -108,6 +113,15 @@ def _route_with_llm( def _route_with_rules(conversation: Conversation, content: str) -> SkillRoute: + if _matches_regulatory_review(content): + return SkillRoute( + action="regulatory_review", + workflow_type="regulatory_review", + confidence=0.7, + reason="命中法规核查关键词。", + source="rule_fallback", + ) + file_summary = evaluate_file_summary_trigger(conversation, content) if file_summary.should_start or file_summary.reason == "missing_attachment": return SkillRoute( @@ -148,9 +162,10 @@ def _router_system_prompt() -> str: return ( "你是审核智能体的工具路由器,只判断是否需要调用工具,不直接回答用户。" "你必须只输出 JSON 对象,不要输出 Markdown。" - "可选 action:normal_chat、attachment_reader、file_summary。" + "可选 action:normal_chat、attachment_reader、file_summary、regulatory_review。" "attachment_reader 用于用户要求阅读、提取、分析、总结、查看上传附件内容。" "file_summary 用于用户要求自动汇总文件目录、页数、清单或生成目录页数报告。" + "regulatory_review 用于用户要求法规核查、NMPA核查、完整性核查、章节一致性核查、风险预警或整改建议。" "normal_chat 用于不需要读取附件或执行工作流的一般问答。" "输出字段:action、confidence、reason。" ) @@ -187,3 +202,18 @@ def _float_or_zero(value) -> float: return float(value) except (TypeError, ValueError): return 0.0 + + +def _matches_regulatory_review(content: str) -> bool: + normalized = content.lower() + keywords = [ + "法规核查", + "nmpa核查", + "nmpa 核查", + "完整性核查", + "风险预警", + "整改建议", + "章节核查", + "一致性核查", + ] + return any(keyword in normalized for keyword in keywords) diff --git a/review_agent/urls.py b/review_agent/urls.py index 6e480dd..1a8c6e8 100644 --- a/review_agent/urls.py +++ b/review_agent/urls.py @@ -10,6 +10,7 @@ from .file_summary.views import ( conversation_messages, export_download, ) +from .regulatory_review.views import batch_status as regulatory_review_batch_status urlpatterns = [ @@ -58,4 +59,9 @@ urlpatterns = [ export_download, name="file_summary_export_download", ), + path( + "api/review-agent/regulatory-review//status/", + regulatory_review_batch_status, + name="regulatory_review_batch_status", + ), ] diff --git a/tests/test_regulatory_views.py b/tests/test_regulatory_views.py new file mode 100644 index 0000000..198f9a6 --- /dev/null +++ b/tests/test_regulatory_views.py @@ -0,0 +1,45 @@ +import pytest +from django.urls import reverse + +from review_agent.models import Conversation, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun + + +pytestmark = pytest.mark.django_db + + +def test_regulatory_batch_status_requires_owner(client, django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + other = django_user_model.objects.create_user(username="other", password="pass") + conversation = Conversation.objects.create(user=owner, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=owner, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=owner, + source_summary_batch=summary, + batch_no="RR-STATUS", + ) + WorkflowNodeRun.objects.create( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + node_group="regulatory_review", + node_code="prepare", + node_name="准备", + progress=50, + ) + + client.force_login(other) + denied = client.get(reverse("regulatory_review_batch_status", args=[batch.pk])) + assert denied.status_code == 404 + + client.force_login(owner) + allowed = client.get(reverse("regulatory_review_batch_status", args=[batch.pk])) + assert allowed.status_code == 200 + payload = allowed.json() + assert payload["batch"]["workflow_type"] == "regulatory_review" + assert payload["batch"]["batch_no"] == "RR-STATUS" + assert payload["nodes"][0]["node_code"] == "prepare" diff --git a/tests/test_regulatory_workflow.py b/tests/test_regulatory_workflow.py new file mode 100644 index 0000000..3d1b0ca --- /dev/null +++ b/tests/test_regulatory_workflow.py @@ -0,0 +1,157 @@ +import pytest + +from review_agent.models import ( + Conversation, + FileSummaryBatch, + Message, + RegulatoryReviewBatch, + WorkflowEvent, + WorkflowNodeRun, +) +from review_agent.regulatory_review.workflow import ( + NODE_DEFINITIONS, + create_regulatory_review_batch, + find_latest_successful_summary_batch, + start_regulatory_review_workflow, +) +from review_agent.services import stream_message +from review_agent.skill_router import SkillRoute, route_message_intent + + +pytestmark = pytest.mark.django_db + + +def test_rule_router_starts_regulatory_review_for_nmpa_keywords(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.skill_router._route_with_llm", + lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), + ) + + route = route_message_intent(conversation, "请做NMPA核查和风险预警") + + assert route.action == "regulatory_review" + assert route.workflow_type == "regulatory_review" + assert route.starts_regulatory_review + + +def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + success = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-FAILED", + status=FileSummaryBatch.Status.FAILED, + ) + + assert find_latest_successful_summary_batch(conversation) == success + + +def test_create_regulatory_review_batch_initializes_nodes(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="法规核查") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + + batch = create_regulatory_review_batch( + conversation=conversation, + user=user, + trigger_message=message, + source_summary_batch=summary, + ) + + assert batch.status == RegulatoryReviewBatch.Status.PENDING + assert WorkflowNodeRun.objects.filter( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + ).count() == len(NODE_DEFINITIONS) + assert WorkflowEvent.objects.filter( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + event_type="workflow_created", + ).exists() + + +def test_start_regulatory_review_workflow_runs_synchronously(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = create_regulatory_review_batch( + conversation=conversation, + user=user, + source_summary_batch=summary, + ) + + start_regulatory_review_workflow(batch, async_run=False) + + batch.refresh_from_db() + assert batch.status == RegulatoryReviewBatch.Status.SUCCESS + assert WorkflowEvent.objects.filter( + workflow_type="regulatory_review", + workflow_batch_id=batch.pk, + event_type="workflow_completed", + ).exists() + + +def test_stream_message_prompts_for_summary_when_missing(monkeypatch, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="regulatory_review", + workflow_type="regulatory_review", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "请做法规核查")) + + joined = "".join(frames) + assert "请先执行自动汇总" in joined + assert not RegulatoryReviewBatch.objects.exists() + + +def test_stream_message_starts_regulatory_workflow(monkeypatch, settings, django_user_model): + settings.REGULATORY_REVIEW_ASYNC = False + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + monkeypatch.setattr( + "review_agent.services.route_message_intent", + lambda conversation, content: SkillRoute( + action="regulatory_review", + workflow_type="regulatory_review", + confidence=0.9, + ), + ) + + frames = list(stream_message(conversation, "请做法规核查")) + + joined = "".join(frames) + assert "workflow_started" in joined + assert "\"workflow_type\": \"regulatory_review\"" in joined + assert RegulatoryReviewBatch.objects.filter(conversation=conversation).exists()