feat(application-form-fill): 串联填表工作流产物输出
This commit is contained in:
@@ -22,6 +22,14 @@ from review_agent.skill_router import SkillRoute
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def stub_aff_llm_extract(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"review_agent.application_form_fill.services.field_extract.generate_completion",
|
||||
lambda messages, temperature=0.0: '{"fields": [], "checklist_items": []}',
|
||||
)
|
||||
|
||||
|
||||
def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model):
|
||||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
conversation = Conversation.objects.create(user=user, title="会话")
|
||||
@@ -83,9 +91,11 @@ def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_p
|
||||
batch_no="FS-AFF-OK",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||||
batch = create_application_form_fill_batch(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
trigger_message=trigger,
|
||||
source_summary_batch=summary,
|
||||
)
|
||||
|
||||
@@ -105,6 +115,73 @@ def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_p
|
||||
).exists()
|
||||
|
||||
|
||||
def test_application_form_fill_workflow_generates_summary_and_exports(settings, tmp_path, django_user_model):
|
||||
settings.MEDIA_ROOT = tmp_path
|
||||
settings.APPLICATION_FORM_FILL_ASYNC = False
|
||||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
conversation = Conversation.objects.create(user=user, title="会话")
|
||||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||||
summary = FileSummaryBatch.objects.create(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
batch_no="FS-AFF-FULL",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
source = tmp_path / "ifu.txt"
|
||||
source.write_text("产品名称:甲胎蛋白检测试剂盒\n预期用途:用于体外检测", encoding="utf-8")
|
||||
from review_agent.models import FileSummaryItem
|
||||
|
||||
FileSummaryItem.objects.create(
|
||||
batch=summary,
|
||||
file_index=1,
|
||||
file_name="说明书.txt",
|
||||
file_type="txt",
|
||||
relative_path="说明书.txt",
|
||||
storage_path=str(source),
|
||||
)
|
||||
batch = create_application_form_fill_batch(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
trigger_message=trigger,
|
||||
source_summary_batch=summary,
|
||||
)
|
||||
|
||||
start_application_form_fill_workflow(batch, async_run=False)
|
||||
|
||||
batch.refresh_from_db()
|
||||
assert batch.status == ApplicationFormFillBatch.Status.SUCCESS
|
||||
assert batch.product_name == "甲胎蛋白检测试剂盒"
|
||||
assert batch.selected_templates == ["registration_certificate"]
|
||||
assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已生成申报模板自动填表文件").exists()
|
||||
assert batch.notifications.filter(send_status="success").exists()
|
||||
|
||||
|
||||
def test_application_form_fill_status_becomes_partial_when_notification_fails(settings, tmp_path, django_user_model):
|
||||
settings.MEDIA_ROOT = tmp_path
|
||||
settings.APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL = True
|
||||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
conversation = Conversation.objects.create(user=user, title="会话")
|
||||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||||
summary = FileSummaryBatch.objects.create(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
batch_no="FS-AFF-PARTIAL",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
batch = create_application_form_fill_batch(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
trigger_message=trigger,
|
||||
source_summary_batch=summary,
|
||||
)
|
||||
|
||||
start_application_form_fill_workflow(batch, async_run=False)
|
||||
|
||||
batch.refresh_from_db()
|
||||
assert batch.status == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS
|
||||
assert batch.notifications.filter(send_status="failed").exists()
|
||||
|
||||
|
||||
def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model):
|
||||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
conversation = Conversation.objects.create(user=user, title="会话")
|
||||
|
||||
Reference in New Issue
Block a user