import pytest from review_agent.models import ( Conversation, ExportedSummaryFile, FileSummaryBatch, FileSummaryItem, Message, RegulatoryIssue, RegulatoryReviewBatch, WorkflowEvent, WorkflowNodeRun, ) from review_agent.regulatory_review.workflow import ( NODE_DEFINITIONS, create_regulatory_review_batch, find_latest_successful_summary_batch, start_regulatory_review_workflow, ) from review_agent.services import stream_message from review_agent.skill_router import SkillRoute, route_message_intent pytestmark = pytest.mark.django_db def test_rule_router_starts_regulatory_review_for_nmpa_keywords(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") monkeypatch.setattr( "review_agent.skill_router._route_with_llm", lambda conversation, content, attachments: (_ for _ in ()).throw(ValueError("fallback")), ) route = route_message_intent(conversation, "请做NMPA核查和风险预警") assert route.action == "regulatory_review" assert route.workflow_type == "regulatory_review" assert route.starts_regulatory_review def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") success = FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-OK", status=FileSummaryBatch.Status.SUCCESS, ) FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-FAILED", status=FileSummaryBatch.Status.FAILED, ) assert find_latest_successful_summary_batch(conversation) == success def test_create_regulatory_review_batch_initializes_nodes(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="法规核查") summary = FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-OK", status=FileSummaryBatch.Status.SUCCESS, ) batch = create_regulatory_review_batch( conversation=conversation, user=user, trigger_message=message, source_summary_batch=summary, ) assert batch.status == RegulatoryReviewBatch.Status.PENDING assert WorkflowNodeRun.objects.filter( workflow_type="regulatory_review", workflow_batch_id=batch.pk, ).count() == len(NODE_DEFINITIONS) assert WorkflowEvent.objects.filter( workflow_type="regulatory_review", workflow_batch_id=batch.pk, event_type="workflow_created", ).exists() def test_start_regulatory_review_workflow_runs_synchronously(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") summary = FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-OK", status=FileSummaryBatch.Status.SUCCESS, ) batch = create_regulatory_review_batch( conversation=conversation, user=user, source_summary_batch=summary, ) batch.condition_json = {"confirmed": True, "confirmed_conditions": {"product_category": "体外诊断试剂"}} batch.save(update_fields=["condition_json"]) start_regulatory_review_workflow(batch, async_run=False) batch.refresh_from_db() assert batch.status == RegulatoryReviewBatch.Status.SUCCESS assert WorkflowEvent.objects.filter( workflow_type="regulatory_review", workflow_batch_id=batch.pk, event_type="workflow_completed", ).exists() def test_stream_message_prompts_for_summary_when_missing(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") monkeypatch.setattr( "review_agent.services.route_message_intent", lambda conversation, content: SkillRoute( action="regulatory_review", workflow_type="regulatory_review", confidence=0.9, ), ) frames = list(stream_message(conversation, "请做法规核查")) joined = "".join(frames) assert "请先执行自动汇总" in joined assert not RegulatoryReviewBatch.objects.exists() def test_stream_message_starts_regulatory_workflow(monkeypatch, settings, django_user_model): settings.REGULATORY_REVIEW_ASYNC = False user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-OK", status=FileSummaryBatch.Status.SUCCESS, ) monkeypatch.setattr( "review_agent.services.route_message_intent", lambda conversation, content: SkillRoute( action="regulatory_review", workflow_type="regulatory_review", confidence=0.9, ), ) frames = list(stream_message(conversation, "请做法规核查")) joined = "".join(frames) assert "workflow_started" in joined assert "\"workflow_type\": \"regulatory_review\"" in joined assert RegulatoryReviewBatch.objects.filter(conversation=conversation).exists() def test_workflow_generates_issues_exports_and_assistant_summary(settings, tmp_path, django_user_model): settings.MEDIA_ROOT = tmp_path settings.REGULATORY_REVIEW_ASYNC = False settings.REGULATORY_RAG_CHROMA_PATH = tmp_path / "missing-rag" user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") summary = FileSummaryBatch.objects.create( conversation=conversation, user=user, batch_no="FS-OK", status=FileSummaryBatch.Status.SUCCESS, ) ifu_path = tmp_path / "ifu.txt" ifu_path.write_text("产品名称:甲胎蛋白检测试剂盒\n样本要求:血清\n有效期:12个月", encoding="utf-8") FileSummaryItem.objects.create( batch=summary, file_index=1, file_name="说明书.txt", file_type="txt", relative_path="说明书.txt", storage_path=str(ifu_path), ) batch = create_regulatory_review_batch( conversation=conversation, user=user, source_summary_batch=summary, ) batch.condition_json = {"confirmed": True, "confirmed_conditions": {"product_category": "体外诊断试剂"}} batch.save(update_fields=["condition_json"]) start_regulatory_review_workflow(batch, async_run=False) batch.refresh_from_db() assert batch.status == RegulatoryReviewBatch.Status.SUCCESS assert RegulatoryIssue.objects.filter(batch=batch, severity="blocking").exists() assert ExportedSummaryFile.objects.filter( workflow_type="regulatory_review", workflow_batch_id=batch.pk, ).count() == 3 assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已完成 NMPA").exists()