415 lines
14 KiB
Python
415 lines
14 KiB
Python
from django.core.files.uploadedfile import SimpleUploadedFile
|
||
from django.urls import reverse
|
||
from io import BytesIO
|
||
from pathlib import Path
|
||
import sys
|
||
import types
|
||
from zipfile import ZipFile
|
||
|
||
from apps.documents.forms import DocumentUploadForm
|
||
from apps.documents.models import SubmissionBatch, UploadedDocument
|
||
from apps.documents.services import extract_text, import_submission_batch, index_document
|
||
from apps.chat.models import Conversation
|
||
|
||
|
||
def test_upload_txt_document_creates_uploaded_record(client, db):
|
||
file = SimpleUploadedFile("rules.txt", "hello".encode("utf-8"), content_type="text/plain")
|
||
|
||
response = client.post(
|
||
reverse("documents:upload"),
|
||
{"scenario_id": "knowledge_qa", "file": file},
|
||
)
|
||
|
||
assert response.status_code == 302
|
||
document = UploadedDocument.objects.get()
|
||
assert document.status == "uploaded"
|
||
assert document.file_type == "txt"
|
||
assert document.scenario_id == "knowledge_qa"
|
||
|
||
|
||
def test_upload_redirect_shows_success_message(client, db):
|
||
file = SimpleUploadedFile("notice.txt", "hello".encode("utf-8"), content_type="text/plain")
|
||
|
||
response = client.post(
|
||
reverse("documents:upload"),
|
||
{"scenario_id": "knowledge_qa", "file": file},
|
||
follow=True,
|
||
)
|
||
|
||
assert response.status_code == 200
|
||
assert "资料包已导入,已绑定会话" in response.content.decode("utf-8")
|
||
|
||
|
||
def test_upload_accepts_pdf_and_docx_documents(client, db):
|
||
for filename, payload in [
|
||
("policy.pdf", b"%PDF-1.4\nplain policy text"),
|
||
("contract.docx", b"fake-docx-body"),
|
||
]:
|
||
file = SimpleUploadedFile(filename, payload)
|
||
|
||
response = client.post(
|
||
reverse("documents:upload"),
|
||
{"scenario_id": "knowledge_qa", "file": file},
|
||
)
|
||
|
||
assert response.status_code == 302
|
||
|
||
assert set(UploadedDocument.objects.values_list("file_type", flat=True)) == {"pdf", "docx"}
|
||
|
||
|
||
def test_index_document_updates_status_to_indexed(client, db):
|
||
document = UploadedDocument.objects.create(
|
||
scenario_id="knowledge_qa",
|
||
original_name="rules.md",
|
||
file="knowledge_qa/rules.md",
|
||
file_type="md",
|
||
size=5,
|
||
status="uploaded",
|
||
)
|
||
document.file.save("rules.md", SimpleUploadedFile("rules.md", b"# rule").file)
|
||
|
||
response = client.post(reverse("documents:index", args=[document.id]))
|
||
|
||
assert response.status_code == 302
|
||
document.refresh_from_db()
|
||
assert document.status == "indexed"
|
||
assert document.error_message == ""
|
||
|
||
|
||
def test_extract_text_supports_pdf_and_docx_plain_text_fallback(db):
|
||
pdf_document = UploadedDocument.objects.create(
|
||
scenario_id="knowledge_qa",
|
||
original_name="policy.pdf",
|
||
file_type="pdf",
|
||
size=10,
|
||
status="uploaded",
|
||
)
|
||
pdf_document.file.save("policy.pdf", SimpleUploadedFile("policy.pdf", b"%PDF-1.4\nSafety policy"))
|
||
|
||
docx_document = UploadedDocument.objects.create(
|
||
scenario_id="knowledge_qa",
|
||
original_name="contract.docx",
|
||
file_type="docx",
|
||
size=10,
|
||
status="uploaded",
|
||
)
|
||
docx_document.file.save(
|
||
"contract.docx",
|
||
SimpleUploadedFile("contract.docx", b"Contract clause review"),
|
||
)
|
||
|
||
assert "Safety policy" in extract_text(pdf_document)
|
||
assert "Contract clause review" in extract_text(docx_document)
|
||
|
||
|
||
def test_document_upload_form_builds_scenario_choices():
|
||
form = DocumentUploadForm()
|
||
|
||
choice_values = [value for value, _label in form.fields["scenario_id"].choices]
|
||
|
||
assert "knowledge_qa" in choice_values
|
||
assert "quality_analysis" in choice_values
|
||
|
||
|
||
def test_index_failure_message_is_visible_on_document_list(client, db, monkeypatch):
|
||
document = UploadedDocument.objects.create(
|
||
scenario_id="knowledge_qa",
|
||
original_name="broken.md",
|
||
file_type="md",
|
||
size=5,
|
||
status="uploaded",
|
||
)
|
||
|
||
def fake_index_document(target_document):
|
||
target_document.status = UploadedDocument.STATUS_FAILED
|
||
target_document.error_message = "模拟入库失败"
|
||
target_document.save(update_fields=["status", "error_message", "updated_at"])
|
||
return target_document
|
||
|
||
monkeypatch.setattr("apps.documents.views.index_document", fake_index_document)
|
||
|
||
response = client.post(reverse("documents:index", args=[document.id]), follow=True)
|
||
|
||
content = response.content.decode("utf-8")
|
||
assert response.status_code == 200
|
||
assert "文档入库失败,请检查错误原因后重试" in content
|
||
assert "模拟入库失败" in content
|
||
|
||
|
||
def test_index_document_marks_failed_when_extracted_text_is_empty(db, monkeypatch):
|
||
document = UploadedDocument.objects.create(
|
||
scenario_id="knowledge_qa",
|
||
original_name="empty.md",
|
||
file_type="md",
|
||
size=0,
|
||
status="uploaded",
|
||
)
|
||
|
||
monkeypatch.setattr("apps.documents.services.extract_text", lambda target: " ")
|
||
|
||
updated_document = index_document(document)
|
||
|
||
assert updated_document.status == UploadedDocument.STATUS_FAILED
|
||
assert "文档内容为空" in updated_document.error_message
|
||
|
||
|
||
def test_upload_creates_submission_batch_and_bound_conversation(client, db):
|
||
file = SimpleUploadedFile(
|
||
"目标产品说明书.txt",
|
||
"产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"),
|
||
content_type="text/plain",
|
||
)
|
||
|
||
response = client.post(
|
||
reverse("documents:upload"),
|
||
{"scenario_id": "document_review", "file": file},
|
||
)
|
||
|
||
assert response.status_code == 302
|
||
batch = SubmissionBatch.objects.get()
|
||
conversation = Conversation.objects.get()
|
||
assert batch.product_name == "新型冠状病毒 2019-nCoV 核酸检测试剂盒"
|
||
assert batch.conversation_id == conversation.conversation_id
|
||
assert conversation.title == "新型冠状病毒 2019-nCoV 核酸检测试剂盒"
|
||
assert batch.file_count == 1
|
||
|
||
|
||
def test_document_list_supports_product_name_search(client, db):
|
||
SubmissionBatch.objects.create(
|
||
batch_id="SUB-20260604-001",
|
||
product_name="新型冠状病毒 2019-nCoV 核酸检测试剂盒",
|
||
workflow_type="registration",
|
||
conversation_id="conv-001",
|
||
file_count=2,
|
||
page_count=12,
|
||
import_status="completed",
|
||
)
|
||
SubmissionBatch.objects.create(
|
||
batch_id="SUB-20260604-002",
|
||
product_name="呼吸道合胞病毒核酸检测试剂盒",
|
||
workflow_type="registration",
|
||
conversation_id="conv-002",
|
||
file_count=3,
|
||
page_count=20,
|
||
import_status="completed",
|
||
)
|
||
|
||
response = client.get(reverse("documents:list"), {"keyword": "新型冠状病毒"})
|
||
|
||
content = response.content.decode("utf-8")
|
||
assert response.status_code == 200
|
||
assert "新型冠状病毒 2019-nCoV 核酸检测试剂盒" in content
|
||
assert "呼吸道合胞病毒核酸检测试剂盒" not in content
|
||
|
||
|
||
def test_document_list_supports_batch_id_search(client, db):
|
||
SubmissionBatch.objects.create(
|
||
batch_id="SUB-20260604-001",
|
||
product_name="产品A",
|
||
workflow_type="registration",
|
||
conversation_id="conv-001",
|
||
file_count=2,
|
||
page_count=12,
|
||
import_status="completed",
|
||
)
|
||
SubmissionBatch.objects.create(
|
||
batch_id="SUB-20260604-002",
|
||
product_name="产品B",
|
||
workflow_type="registration",
|
||
conversation_id="conv-002",
|
||
file_count=3,
|
||
page_count=20,
|
||
import_status="completed",
|
||
)
|
||
|
||
response = client.get(reverse("documents:list"), {"keyword": "SUB-20260604-002"})
|
||
|
||
content = response.content.decode("utf-8")
|
||
assert response.status_code == 200
|
||
assert "SUB-20260604-002" in content
|
||
assert "SUB-20260604-001" not in content
|
||
|
||
|
||
def test_import_submission_batch_marks_manual_review_when_product_names_conflict(db):
|
||
files = [
|
||
SimpleUploadedFile(
|
||
"注册申请表.txt",
|
||
"产品名称:产品A".encode("utf-8"),
|
||
content_type="text/plain",
|
||
),
|
||
SimpleUploadedFile(
|
||
"目标产品说明书.txt",
|
||
"产品名称:产品B".encode("utf-8"),
|
||
content_type="text/plain",
|
||
),
|
||
]
|
||
|
||
result = import_submission_batch("document_review", files)
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
assert batch.import_status == "review_required"
|
||
assert result["registration_overview_report"]["warnings"]
|
||
assert "产品名称来源冲突" in result["registration_overview_report"]["warnings"][0]
|
||
|
||
|
||
def test_upload_multiple_files_creates_single_submission_batch_and_multiple_documents(client, db):
|
||
application = SimpleUploadedFile(
|
||
"注册申请表.txt",
|
||
"产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"),
|
||
content_type="text/plain",
|
||
)
|
||
manual = SimpleUploadedFile(
|
||
"目标产品说明书.txt",
|
||
"产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"),
|
||
content_type="text/plain",
|
||
)
|
||
|
||
response = client.post(
|
||
reverse("documents:upload"),
|
||
{"scenario_id": "document_review", "files": [application, manual]},
|
||
)
|
||
|
||
assert response.status_code == 302
|
||
batch = SubmissionBatch.objects.get()
|
||
assert batch.file_count == 2
|
||
assert UploadedDocument.objects.filter(batch=batch).count() == 2
|
||
assert Conversation.objects.get().title == "新型冠状病毒 2019-nCoV 核酸检测试剂盒"
|
||
|
||
|
||
def test_import_submission_batch_supports_zip_package_and_preserves_relative_paths(db):
|
||
archive = BytesIO()
|
||
with ZipFile(archive, "w") as zip_file:
|
||
zip_file.writestr("CH1/注册申请表.txt", "产品名称:产品A")
|
||
zip_file.writestr("CH1/目标产品说明书.txt", "产品名称:产品A")
|
||
archive.seek(0)
|
||
package = SimpleUploadedFile(
|
||
"registration-package.zip",
|
||
archive.read(),
|
||
content_type="application/zip",
|
||
)
|
||
|
||
result = import_submission_batch("document_review", [package])
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
documents = list(UploadedDocument.objects.filter(batch=batch).order_by("relative_path"))
|
||
assert batch.file_count == 2
|
||
assert [document.relative_path for document in documents] == [
|
||
"CH1/注册申请表.txt",
|
||
"CH1/目标产品说明书.txt",
|
||
]
|
||
|
||
|
||
def test_import_submission_batch_supports_7z_package_and_preserves_relative_paths(db, monkeypatch, tmp_path):
|
||
package = SimpleUploadedFile(
|
||
"registration-package.7z",
|
||
b"fake-7z-bytes",
|
||
content_type="application/x-7z-compressed",
|
||
)
|
||
|
||
class FakeSevenZipFile:
|
||
def __init__(self, _file_obj, mode="r"):
|
||
self.mode = mode
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc, tb):
|
||
return False
|
||
|
||
def extractall(self, path):
|
||
target = Path(path)
|
||
(target / "CH1").mkdir(parents=True, exist_ok=True)
|
||
(target / "CH1" / "注册申请表.txt").write_text("产品名称:产品A", encoding="utf-8")
|
||
(target / "CH1" / "目标产品说明书.txt").write_text("产品名称:产品A", encoding="utf-8")
|
||
|
||
fake_module = types.SimpleNamespace(SevenZipFile=FakeSevenZipFile)
|
||
monkeypatch.setitem(sys.modules, "py7zr", fake_module)
|
||
|
||
result = import_submission_batch("document_review", [package])
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
documents = list(UploadedDocument.objects.filter(batch=batch).order_by("relative_path"))
|
||
assert batch.file_count == 2
|
||
assert [document.relative_path for document in documents] == [
|
||
"CH1/注册申请表.txt",
|
||
"CH1/目标产品说明书.txt",
|
||
]
|
||
|
||
|
||
def test_import_submission_batch_records_warnings_for_unsupported_zip_entries(db):
|
||
archive = BytesIO()
|
||
with ZipFile(archive, "w") as zip_file:
|
||
zip_file.writestr("CH1/注册申请表.txt", "产品名称:产品A")
|
||
zip_file.writestr("CH1/忽略图片.png", b"binary-image-data")
|
||
archive.seek(0)
|
||
package = SimpleUploadedFile(
|
||
"registration-package.zip",
|
||
archive.read(),
|
||
content_type="application/zip",
|
||
)
|
||
|
||
result = import_submission_batch("document_review", [package])
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
warnings = result["registration_overview_report"]["warnings"]
|
||
assert batch.file_count == 1
|
||
assert batch.exception_count == 1
|
||
assert "跳过不支持的文件" in warnings[0]
|
||
assert "CH1/忽略图片.png" in warnings[0]
|
||
|
||
|
||
def test_import_submission_batch_marks_failed_when_zip_has_no_supported_files(db):
|
||
archive = BytesIO()
|
||
with ZipFile(archive, "w") as zip_file:
|
||
zip_file.writestr("assets/readme.png", b"binary-image-data")
|
||
archive.seek(0)
|
||
package = SimpleUploadedFile(
|
||
"empty-registration-package.zip",
|
||
archive.read(),
|
||
content_type="application/zip",
|
||
)
|
||
|
||
result = import_submission_batch("document_review", [package])
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
warnings = result["registration_overview_report"]["warnings"]
|
||
assert batch.file_count == 0
|
||
assert batch.import_status == SubmissionBatch.STATUS_FAILED
|
||
assert batch.exception_count == len(warnings)
|
||
assert any("未发现可导入的支持文件" in warning for warning in warnings)
|
||
|
||
|
||
def test_import_submission_batch_records_warnings_for_unsupported_7z_entries(db, monkeypatch):
|
||
package = SimpleUploadedFile(
|
||
"registration-package.7z",
|
||
b"fake-7z-bytes",
|
||
content_type="application/x-7z-compressed",
|
||
)
|
||
|
||
class FakeSevenZipFile:
|
||
def __init__(self, _file_obj, mode="r"):
|
||
self.mode = mode
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc, tb):
|
||
return False
|
||
|
||
def extractall(self, path):
|
||
target = Path(path)
|
||
(target / "CH1").mkdir(parents=True, exist_ok=True)
|
||
(target / "CH1" / "注册申请表.txt").write_text("产品名称:产品A", encoding="utf-8")
|
||
(target / "CH1" / "忽略图片.png").write_bytes(b"binary-image-data")
|
||
|
||
fake_module = types.SimpleNamespace(SevenZipFile=FakeSevenZipFile)
|
||
monkeypatch.setitem(sys.modules, "py7zr", fake_module)
|
||
|
||
result = import_submission_batch("document_review", [package])
|
||
|
||
batch = SubmissionBatch.objects.get(batch_id=result["batch_id"])
|
||
warnings = result["registration_overview_report"]["warnings"]
|
||
assert batch.file_count == 1
|
||
assert batch.exception_count == 1
|
||
assert any("CH1/忽略图片.png" in warning for warning in warnings)
|