334 lines
13 KiB
Python
334 lines
13 KiB
Python
import pytest
|
||
|
||
from review_agent.application_form_fill.constants import FORM_FILL_NODE_DEFINITIONS
|
||
from review_agent.application_form_fill.workflow import (
|
||
create_application_form_fill_batch,
|
||
find_latest_successful_summary_batch,
|
||
start_application_form_fill_workflow,
|
||
)
|
||
from review_agent.models import (
|
||
ApplicationFormFillBatch,
|
||
Conversation,
|
||
FileAttachment,
|
||
FileSummaryBatch,
|
||
FileSummaryBatchAttachment,
|
||
Message,
|
||
WorkflowEvent,
|
||
WorkflowNodeRun,
|
||
)
|
||
from review_agent.services import stream_message
|
||
from review_agent.skill_router import SkillRoute
|
||
|
||
|
||
pytestmark = pytest.mark.django_db
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def stub_aff_llm_extract(monkeypatch):
|
||
monkeypatch.setattr(
|
||
"review_agent.application_form_fill.services.field_extract.generate_completion",
|
||
lambda messages, temperature=0.0: '{"fields": [], "checklist_items": []}',
|
||
)
|
||
|
||
|
||
def test_find_latest_successful_summary_batch_ignores_failed_batches(django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
success = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-OK",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-FAILED",
|
||
status=FileSummaryBatch.Status.FAILED,
|
||
)
|
||
|
||
assert find_latest_successful_summary_batch(conversation) == success
|
||
|
||
|
||
def test_create_application_form_fill_batch_initializes_nodes(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||
summary = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-OK",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
|
||
batch = create_application_form_fill_batch(
|
||
conversation=conversation,
|
||
user=user,
|
||
trigger_message=message,
|
||
source_summary_batch=summary,
|
||
)
|
||
|
||
assert batch.status == ApplicationFormFillBatch.Status.PENDING
|
||
assert batch.output_types == ["word", "excel", "json"]
|
||
assert WorkflowNodeRun.objects.filter(
|
||
workflow_type="application_form_fill",
|
||
workflow_batch_id=batch.pk,
|
||
).count() == len(FORM_FILL_NODE_DEFINITIONS)
|
||
assert WorkflowEvent.objects.filter(
|
||
workflow_type="application_form_fill",
|
||
workflow_batch_id=batch.pk,
|
||
event_type="workflow_created",
|
||
).exists()
|
||
|
||
|
||
def test_application_form_fill_executor_runs_nodes_and_skips_pdf(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
summary = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-OK",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||
batch = create_application_form_fill_batch(
|
||
conversation=conversation,
|
||
user=user,
|
||
trigger_message=trigger,
|
||
source_summary_batch=summary,
|
||
)
|
||
|
||
start_application_form_fill_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
assert batch.status == ApplicationFormFillBatch.Status.SUCCESS
|
||
assert WorkflowNodeRun.objects.get(
|
||
workflow_type="application_form_fill",
|
||
workflow_batch_id=batch.pk,
|
||
node_code="pdf_convert",
|
||
).status == WorkflowNodeRun.Status.SKIPPED
|
||
assert WorkflowEvent.objects.filter(
|
||
workflow_type="application_form_fill",
|
||
workflow_batch_id=batch.pk,
|
||
event_type="workflow_completed",
|
||
).exists()
|
||
|
||
|
||
def test_application_form_fill_workflow_generates_summary_and_exports(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
settings.APPLICATION_FORM_FILL_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||
summary = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-FULL",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
source = tmp_path / "ifu.txt"
|
||
source.write_text("产品名称:甲胎蛋白检测试剂盒\n预期用途:用于体外检测", encoding="utf-8")
|
||
from review_agent.models import FileSummaryItem
|
||
|
||
FileSummaryItem.objects.create(
|
||
batch=summary,
|
||
file_index=1,
|
||
file_name="说明书.txt",
|
||
file_type="txt",
|
||
relative_path="说明书.txt",
|
||
storage_path=str(source),
|
||
)
|
||
batch = create_application_form_fill_batch(
|
||
conversation=conversation,
|
||
user=user,
|
||
trigger_message=trigger,
|
||
source_summary_batch=summary,
|
||
)
|
||
|
||
start_application_form_fill_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
assert batch.status == ApplicationFormFillBatch.Status.SUCCESS
|
||
assert batch.product_name == "甲胎蛋白检测试剂盒"
|
||
assert batch.selected_templates == ["registration_certificate"]
|
||
assert conversation.messages.filter(role=Message.Role.ASSISTANT, content__contains="已生成申报模板自动填表文件").exists()
|
||
assert batch.notifications.filter(send_status="success").exists()
|
||
|
||
|
||
def test_application_form_fill_status_becomes_partial_when_notification_fails(settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
settings.APPLICATION_FORM_FILL_MOCK_NOTIFY_FAIL = True
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
trigger = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="帮我填注册证")
|
||
summary = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-PARTIAL",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
batch = create_application_form_fill_batch(
|
||
conversation=conversation,
|
||
user=user,
|
||
trigger_message=trigger,
|
||
source_summary_batch=summary,
|
||
)
|
||
|
||
start_application_form_fill_workflow(batch, async_run=False)
|
||
|
||
batch.refresh_from_db()
|
||
assert batch.status == ApplicationFormFillBatch.Status.PARTIAL_SUCCESS
|
||
assert batch.notifications.filter(send_status="failed").exists()
|
||
|
||
|
||
def test_stream_message_prompts_for_upload_when_no_summary_or_attachment(monkeypatch, django_user_model):
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="application_form_fill",
|
||
workflow_type="application_form_fill",
|
||
confidence=0.9,
|
||
),
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "帮我填注册证"))
|
||
|
||
joined = "".join(frames)
|
||
assert "请先在当前对话右侧上传需要填表的产品资料或压缩包" in joined
|
||
assert not ApplicationFormFillBatch.objects.exists()
|
||
|
||
|
||
def test_stream_message_starts_application_form_fill_workflow(monkeypatch, settings, tmp_path, django_user_model):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
settings.APPLICATION_FORM_FILL_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-AFF-OK",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="application_form_fill",
|
||
workflow_type="application_form_fill",
|
||
confidence=0.9,
|
||
),
|
||
)
|
||
|
||
frames = list(stream_message(conversation, "帮我填注册证"))
|
||
|
||
joined = "".join(frames)
|
||
assert "workflow_started" in joined
|
||
assert '"workflow_type": "application_form_fill"' in joined
|
||
assert "已启动申报文件自动填表工作流" in joined
|
||
assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()
|
||
|
||
|
||
def test_stream_message_auto_runs_summary_before_application_form_fill(
|
||
monkeypatch, settings, tmp_path, django_user_model
|
||
):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
settings.APPLICATION_FORM_FILL_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
attachment_path = tmp_path / "application.txt"
|
||
attachment_path.write_text("产品名称:甲胎蛋白检测试剂盒", encoding="utf-8")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="application.txt",
|
||
storage_path=str(attachment_path),
|
||
file_size=attachment_path.stat().st_size,
|
||
is_active=True,
|
||
)
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="application_form_fill",
|
||
workflow_type="application_form_fill",
|
||
confidence=0.9,
|
||
),
|
||
)
|
||
|
||
def finish_summary(batch, async_run=True):
|
||
batch.status = FileSummaryBatch.Status.SUCCESS
|
||
batch.save(update_fields=["status"])
|
||
|
||
monkeypatch.setattr("review_agent.services.start_file_summary_workflow", finish_summary)
|
||
|
||
frames = list(stream_message(conversation, "为我该方案生成申报模板"))
|
||
joined = "".join(frames)
|
||
|
||
assert '"workflow_type": "file_summary"' in joined
|
||
assert '"workflow_type": "application_form_fill"' in joined
|
||
assert "汇总完成后继续自动填表" in joined
|
||
assert FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS).exists()
|
||
assert ApplicationFormFillBatch.objects.filter(conversation=conversation).exists()
|
||
|
||
|
||
def test_stream_message_reruns_summary_when_new_attachment_not_in_latest_batch(
|
||
monkeypatch, settings, tmp_path, django_user_model
|
||
):
|
||
settings.MEDIA_ROOT = tmp_path
|
||
settings.APPLICATION_FORM_FILL_ASYNC = False
|
||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||
conversation = Conversation.objects.create(user=user, title="会话")
|
||
old_path = tmp_path / "old.txt"
|
||
old_path.write_text("旧资料", encoding="utf-8")
|
||
old_attachment = FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="旧资料.txt",
|
||
storage_path=str(old_path),
|
||
file_size=old_path.stat().st_size,
|
||
is_active=True,
|
||
)
|
||
old_summary = FileSummaryBatch.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
batch_no="FS-OLD",
|
||
status=FileSummaryBatch.Status.SUCCESS,
|
||
)
|
||
FileSummaryBatchAttachment.objects.create(batch=old_summary, attachment=old_attachment)
|
||
new_path = tmp_path / "ifu.txt"
|
||
new_path.write_text("【产品名称】\n新型冠状病毒2019-nCoV核酸检测试剂盒(荧光PCR法)", encoding="utf-8")
|
||
FileAttachment.objects.create(
|
||
conversation=conversation,
|
||
user=user,
|
||
original_name="目标产品说明书.docx",
|
||
storage_path=str(new_path),
|
||
file_size=new_path.stat().st_size,
|
||
is_active=True,
|
||
)
|
||
monkeypatch.setattr(
|
||
"review_agent.services.route_message_intent",
|
||
lambda conversation, content: SkillRoute(
|
||
action="application_form_fill",
|
||
workflow_type="application_form_fill",
|
||
confidence=0.9,
|
||
),
|
||
)
|
||
|
||
def finish_summary(batch, async_run=True):
|
||
batch.status = FileSummaryBatch.Status.SUCCESS
|
||
batch.save(update_fields=["status"])
|
||
|
||
monkeypatch.setattr("review_agent.services.start_file_summary_workflow", finish_summary)
|
||
|
||
frames = list(stream_message(conversation, "请基于当前对话最近成功汇总的产品资料,自动提取产品关键信息并填入申报文件模板"))
|
||
joined = "".join(frames)
|
||
|
||
assert '"workflow_type": "file_summary"' in joined
|
||
assert "汇总完成后继续自动填表" in joined
|
||
latest_summary = FileSummaryBatch.objects.order_by("-id").first()
|
||
form_batch = ApplicationFormFillBatch.objects.get(conversation=conversation)
|
||
assert latest_summary != old_summary
|
||
assert form_batch.source_summary_batch == latest_summary
|