feat(application-form-fill): 新增自动填表批次模型
This commit is contained in:
109
tests/test_application_form_fill_models.py
Normal file
109
tests/test_application_form_fill_models.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import pytest
|
||||
|
||||
from review_agent.models import (
|
||||
ApplicationFormFillArtifact,
|
||||
ApplicationFormFillBatch,
|
||||
ApplicationFormFillNotificationRecord,
|
||||
Conversation,
|
||||
ExportedSummaryFile,
|
||||
FileSummaryBatch,
|
||||
Message,
|
||||
RegulatoryReviewBatch,
|
||||
)
|
||||
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_application_form_fill_models_store_batch_artifact_notification_and_exports(django_user_model):
|
||||
user = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
conversation = Conversation.objects.create(user=user, title="自动填表")
|
||||
trigger = Message.objects.create(
|
||||
conversation=conversation,
|
||||
role=Message.Role.USER,
|
||||
content="帮我填注册证",
|
||||
)
|
||||
summary_batch = FileSummaryBatch.objects.create(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
batch_no="FS-AFF-READY",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
regulatory_batch = RegulatoryReviewBatch.objects.create(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
source_summary_batch=summary_batch,
|
||||
batch_no="RR-AFF-SOURCE",
|
||||
condition_json={"confirmed": True, "registration_type": "首次注册"},
|
||||
)
|
||||
|
||||
batch = ApplicationFormFillBatch.objects.create(
|
||||
conversation=conversation,
|
||||
user=user,
|
||||
trigger_message=trigger,
|
||||
source_summary_batch=summary_batch,
|
||||
source_regulatory_batch=regulatory_batch,
|
||||
batch_no="AFF-20260607153000-abcdef",
|
||||
requested_templates=["registration_certificate"],
|
||||
selected_templates=["registration_certificate"],
|
||||
output_types=["word", "excel", "json"],
|
||||
registration_type="首次注册",
|
||||
registration_type_source=ApplicationFormFillBatch.RegistrationTypeSource.USER_MESSAGE,
|
||||
product_name="甲胎蛋白检测试剂盒",
|
||||
conflict_summary=[{"field_key": "storage_condition"}],
|
||||
risk_notes=[{"type": "pdf_pending"}],
|
||||
template_config_version="application_form_templates_v1",
|
||||
template_config_hash="hash",
|
||||
work_dir="media/application_form_fill/1/1/AFF-20260607153000-abcdef",
|
||||
)
|
||||
artifact = ApplicationFormFillArtifact.objects.create(
|
||||
batch=batch,
|
||||
artifact_type=ApplicationFormFillArtifact.ArtifactType.FILLED_TEMPLATE,
|
||||
file_format=ApplicationFormFillArtifact.FileFormat.DOCX,
|
||||
name="注册证格式",
|
||||
file_name="filled.docx",
|
||||
storage_path="media/application_form_fill/filled.docx",
|
||||
file_size=123,
|
||||
content_hash="sha256",
|
||||
metadata={"template_code": "registration_certificate"},
|
||||
created_by_node="word_fill",
|
||||
)
|
||||
notification = ApplicationFormFillNotificationRecord.objects.create(
|
||||
batch=batch,
|
||||
recipient=user,
|
||||
channel=ApplicationFormFillNotificationRecord.Channel.MOCK,
|
||||
template_codes=["registration_certificate"],
|
||||
export_ids=[1],
|
||||
message_summary="自动填表完成",
|
||||
send_status=ApplicationFormFillNotificationRecord.SendStatus.FAILED,
|
||||
retry_count=1,
|
||||
error_message="mock failed",
|
||||
)
|
||||
word_export = ExportedSummaryFile.objects.create(
|
||||
batch=summary_batch,
|
||||
workflow_type="application_form_fill",
|
||||
workflow_batch_id=batch.pk,
|
||||
export_category="filled_template",
|
||||
export_type=ExportedSummaryFile.ExportType.WORD,
|
||||
file_name="filled.docx",
|
||||
storage_path="media/application_form_fill/filled.docx",
|
||||
)
|
||||
pdf_export = ExportedSummaryFile.objects.create(
|
||||
batch=summary_batch,
|
||||
workflow_type="application_form_fill",
|
||||
workflow_batch_id=batch.pk,
|
||||
export_category="filled_template",
|
||||
export_type=ExportedSummaryFile.ExportType.PDF,
|
||||
file_name="filled.pdf",
|
||||
storage_path="media/application_form_fill/filled.pdf",
|
||||
)
|
||||
|
||||
assert batch.status == ApplicationFormFillBatch.Status.PENDING
|
||||
assert batch.source_summary_batch == summary_batch
|
||||
assert batch.source_regulatory_batch == regulatory_batch
|
||||
assert artifact.content_hash == "sha256"
|
||||
assert artifact.metadata["template_code"] == "registration_certificate"
|
||||
assert notification.send_status == ApplicationFormFillNotificationRecord.SendStatus.FAILED
|
||||
assert notification.retry_count == 1
|
||||
assert word_export.export_type == ExportedSummaryFile.ExportType.WORD
|
||||
assert pdf_export.export_type == ExportedSummaryFile.ExportType.PDF
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
import pytest
|
||||
|
||||
from review_agent.models import (
|
||||
ApplicationFormFillBatch,
|
||||
Conversation,
|
||||
ExportedSummaryFile,
|
||||
FileAttachment,
|
||||
@@ -109,6 +110,54 @@ def test_export_download_requires_batch_owner(client, tmp_path, django_user_mode
|
||||
assert allowed["Content-Type"].startswith("text/markdown")
|
||||
|
||||
|
||||
def test_export_download_checks_application_form_fill_batch_owner(client, tmp_path, django_user_model):
|
||||
owner = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
other = django_user_model.objects.create_user(username="other", password="pass")
|
||||
owner_conversation = Conversation.objects.create(user=owner, title="自动填表")
|
||||
other_conversation = Conversation.objects.create(user=other, title="其他对话")
|
||||
owner_summary = FileSummaryBatch.objects.create(
|
||||
conversation=owner_conversation,
|
||||
user=owner,
|
||||
batch_no="FS-AFF-OWNER",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
other_summary = FileSummaryBatch.objects.create(
|
||||
conversation=other_conversation,
|
||||
user=other,
|
||||
batch_no="FS-AFF-OTHER",
|
||||
status=FileSummaryBatch.Status.SUCCESS,
|
||||
)
|
||||
form_batch = ApplicationFormFillBatch.objects.create(
|
||||
conversation=owner_conversation,
|
||||
user=owner,
|
||||
source_summary_batch=owner_summary,
|
||||
batch_no="AFF-DL",
|
||||
)
|
||||
report_path = tmp_path / "filled.docx"
|
||||
report_path.write_bytes(b"word-content")
|
||||
exported = ExportedSummaryFile.objects.create(
|
||||
batch=other_summary,
|
||||
workflow_type="application_form_fill",
|
||||
workflow_batch_id=form_batch.pk,
|
||||
export_category="filled_template",
|
||||
export_type=ExportedSummaryFile.ExportType.WORD,
|
||||
file_name="filled.docx",
|
||||
storage_path=str(report_path),
|
||||
)
|
||||
|
||||
client.force_login(other)
|
||||
denied = client.get(reverse("file_summary_export_download", args=[exported.pk]))
|
||||
assert denied.status_code == 404
|
||||
|
||||
client.force_login(owner)
|
||||
allowed = client.get(reverse("file_summary_export_download", args=[exported.pk]))
|
||||
assert allowed.status_code == 200
|
||||
assert allowed["Content-Type"].startswith(
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
||||
)
|
||||
assert b"".join(allowed.streaming_content) == b"word-content"
|
||||
|
||||
|
||||
def test_conversation_messages_returns_incremental_messages(client, django_user_model):
|
||||
owner = django_user_model.objects.create_user(username="owner", password="pass")
|
||||
other = django_user_model.objects.create_user(username="other", password="pass")
|
||||
|
||||
Reference in New Issue
Block a user