Files
DEMO-AGENT/tests/test_regulatory_info_package_workflow.py

63 lines
2.4 KiB
Python

import pytest
from review_agent.models import Conversation, RegulatoryInfoPackageBatch, WorkflowNodeRun
from review_agent.regulatory_info_package.constants import (
REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS,
WORKFLOW_TYPE,
)
from review_agent.regulatory_info_package.workflow import (
create_regulatory_info_package_batch,
start_regulatory_info_package_workflow,
)
pytestmark = pytest.mark.django_db
def test_create_regulatory_info_package_batch_initializes_nodes(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
batch = create_regulatory_info_package_batch(conversation=conversation, user=user)
assert batch.batch_no.startswith("RIP-")
assert batch.work_dir
nodes = WorkflowNodeRun.objects.filter(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
).order_by("id")
assert [node.node_code for node in nodes] == [
code for code, _name, _group in REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS
]
def test_create_regulatory_info_package_batch_is_node_idempotent(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
batch = create_regulatory_info_package_batch(conversation=conversation, user=user)
create_regulatory_info_package_batch(conversation=conversation, user=user, existing_batch=batch)
assert WorkflowNodeRun.objects.filter(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
).count() == len(REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS)
def test_empty_workflow_skeleton_completes(django_user_model, settings):
settings.REGULATORY_INFO_PACKAGE_ASYNC = False
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
batch = create_regulatory_info_package_batch(conversation=conversation, user=user)
start_regulatory_info_package_workflow(batch, async_run=False)
batch.refresh_from_db()
assert batch.status == RegulatoryInfoPackageBatch.Status.SUCCESS
assert WorkflowNodeRun.objects.filter(
workflow_type=WORKFLOW_TYPE,
workflow_batch_id=batch.pk,
status=WorkflowNodeRun.Status.SUCCESS,
).count() == len(REGULATORY_INFO_PACKAGE_NODE_DEFINITIONS)