from django.core.files.uploadedfile import SimpleUploadedFile from django.urls import reverse from io import BytesIO from pathlib import Path import sys import types from zipfile import ZipFile from apps.documents.forms import DocumentUploadForm from apps.documents.models import ExportedDocument, SubmissionBatch, UploadedDocument from apps.documents.services import extract_text, import_submission_batch, index_document from apps.chat.models import Conversation def test_upload_txt_document_creates_uploaded_record(client, db): file = SimpleUploadedFile("rules.txt", "hello".encode("utf-8"), content_type="text/plain") response = client.post( reverse("documents:upload"), {"scenario_id": "knowledge_qa", "file": file}, ) assert response.status_code == 302 document = UploadedDocument.objects.get() assert document.status == "uploaded" assert document.file_type == "txt" assert document.scenario_id == "knowledge_qa" def test_upload_redirect_shows_success_message(client, db): file = SimpleUploadedFile("notice.txt", "hello".encode("utf-8"), content_type="text/plain") response = client.post( reverse("documents:upload"), {"scenario_id": "knowledge_qa", "file": file}, follow=True, ) assert response.status_code == 200 assert "资料包已导入,已绑定会话" in response.content.decode("utf-8") def test_upload_accepts_pdf_and_docx_documents(client, db): for filename, payload in [ ("policy.pdf", b"%PDF-1.4\nplain policy text"), ("contract.docx", b"fake-docx-body"), ]: file = SimpleUploadedFile(filename, payload) response = client.post( reverse("documents:upload"), {"scenario_id": "knowledge_qa", "file": file}, ) assert response.status_code == 302 assert set(UploadedDocument.objects.values_list("file_type", flat=True)) == {"pdf", "docx"} def test_index_document_updates_status_to_indexed(client, db): document = UploadedDocument.objects.create( scenario_id="knowledge_qa", original_name="rules.md", file="knowledge_qa/rules.md", file_type="md", size=5, status="uploaded", ) document.file.save("rules.md", SimpleUploadedFile("rules.md", b"# rule").file) response = client.post(reverse("documents:index", args=[document.id])) assert response.status_code == 302 document.refresh_from_db() assert document.status == "indexed" assert document.error_message == "" def test_extract_text_supports_pdf_and_docx_plain_text_fallback(db): pdf_document = UploadedDocument.objects.create( scenario_id="knowledge_qa", original_name="policy.pdf", file_type="pdf", size=10, status="uploaded", ) pdf_document.file.save("policy.pdf", SimpleUploadedFile("policy.pdf", b"%PDF-1.4\nSafety policy")) docx_document = UploadedDocument.objects.create( scenario_id="knowledge_qa", original_name="contract.docx", file_type="docx", size=10, status="uploaded", ) docx_document.file.save( "contract.docx", SimpleUploadedFile("contract.docx", b"Contract clause review"), ) assert "Safety policy" in extract_text(pdf_document) assert "Contract clause review" in extract_text(docx_document) def test_document_upload_form_builds_scenario_choices(): form = DocumentUploadForm() choice_values = [value for value, _label in form.fields["scenario_id"].choices] assert "knowledge_qa" in choice_values assert "quality_analysis" in choice_values def test_document_upload_form_accepts_rar_package(): form = DocumentUploadForm( data={"scenario_id": "knowledge_qa"}, files={ "file": SimpleUploadedFile( "registration-package.rar", b"fake-rar-bytes", content_type="application/vnd.rar", ) }, ) assert form.is_valid() def test_index_failure_message_is_visible_on_document_list(client, db, monkeypatch): document = UploadedDocument.objects.create( scenario_id="knowledge_qa", original_name="broken.md", file_type="md", size=5, status="uploaded", ) def fake_index_document(target_document): target_document.status = UploadedDocument.STATUS_FAILED target_document.error_message = "模拟入库失败" target_document.save(update_fields=["status", "error_message", "updated_at"]) return target_document monkeypatch.setattr("apps.documents.views.index_document", fake_index_document) response = client.post(reverse("documents:index", args=[document.id]), follow=True) content = response.content.decode("utf-8") assert response.status_code == 200 assert "文档入库失败,请检查错误原因后重试" in content assert "模拟入库失败" in content def test_index_document_marks_failed_when_extracted_text_is_empty(db, monkeypatch): document = UploadedDocument.objects.create( scenario_id="knowledge_qa", original_name="empty.md", file_type="md", size=0, status="uploaded", ) monkeypatch.setattr("apps.documents.services.extract_text", lambda target: " ") updated_document = index_document(document) assert updated_document.status == UploadedDocument.STATUS_FAILED assert "文档内容为空" in updated_document.error_message def test_upload_creates_submission_batch_and_bound_conversation(client, db): file = SimpleUploadedFile( "目标产品说明书.txt", "产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"), content_type="text/plain", ) response = client.post( reverse("documents:upload"), {"scenario_id": "document_review", "file": file}, ) assert response.status_code == 302 batch = SubmissionBatch.objects.get() conversation = Conversation.objects.get() assert batch.product_name == "新型冠状病毒 2019-nCoV 核酸检测试剂盒" assert batch.conversation_id == conversation.conversation_id assert conversation.title == "新型冠状病毒 2019-nCoV 核酸检测试剂盒" assert batch.file_count == 1 def test_document_list_supports_product_name_search(client, db): SubmissionBatch.objects.create( batch_id="SUB-20260604-001", product_name="新型冠状病毒 2019-nCoV 核酸检测试剂盒", workflow_type="registration", conversation_id="conv-001", file_count=2, page_count=12, import_status="completed", ) SubmissionBatch.objects.create( batch_id="SUB-20260604-002", product_name="呼吸道合胞病毒核酸检测试剂盒", workflow_type="registration", conversation_id="conv-002", file_count=3, page_count=20, import_status="completed", ) response = client.get(reverse("documents:list"), {"keyword": "新型冠状病毒"}) content = response.content.decode("utf-8") assert response.status_code == 200 assert "新型冠状病毒 2019-nCoV 核酸检测试剂盒" in content assert "呼吸道合胞病毒核酸检测试剂盒" not in content def test_document_list_supports_batch_id_search(client, db): SubmissionBatch.objects.create( batch_id="SUB-20260604-001", product_name="产品A", workflow_type="registration", conversation_id="conv-001", file_count=2, page_count=12, import_status="completed", ) SubmissionBatch.objects.create( batch_id="SUB-20260604-002", product_name="产品B", workflow_type="registration", conversation_id="conv-002", file_count=3, page_count=20, import_status="completed", ) response = client.get(reverse("documents:list"), {"keyword": "SUB-20260604-002"}) content = response.content.decode("utf-8") assert response.status_code == 200 assert "SUB-20260604-002" in content assert "SUB-20260604-001" not in content def test_import_submission_batch_marks_manual_review_when_product_names_conflict(db): files = [ SimpleUploadedFile( "注册申请表.txt", "产品名称:产品A".encode("utf-8"), content_type="text/plain", ), SimpleUploadedFile( "目标产品说明书.txt", "产品名称:产品B".encode("utf-8"), content_type="text/plain", ), ] result = import_submission_batch("document_review", files) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) assert batch.import_status == "review_required" assert result["registration_overview_report"]["warnings"] assert "产品名称来源冲突" in result["registration_overview_report"]["warnings"][0] def test_upload_multiple_files_creates_single_submission_batch_and_multiple_documents(client, db): application = SimpleUploadedFile( "注册申请表.txt", "产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"), content_type="text/plain", ) manual = SimpleUploadedFile( "目标产品说明书.txt", "产品名称:新型冠状病毒 2019-nCoV 核酸检测试剂盒".encode("utf-8"), content_type="text/plain", ) response = client.post( reverse("documents:upload"), {"scenario_id": "document_review", "files": [application, manual]}, ) assert response.status_code == 302 batch = SubmissionBatch.objects.get() assert batch.file_count == 2 assert UploadedDocument.objects.filter(batch=batch).count() == 2 assert Conversation.objects.get().title == "新型冠状病毒 2019-nCoV 核酸检测试剂盒" def test_import_submission_batch_supports_zip_package_and_preserves_relative_paths(db): archive = BytesIO() with ZipFile(archive, "w") as zip_file: zip_file.writestr("CH1/注册申请表.txt", "产品名称:产品A") zip_file.writestr("CH1/目标产品说明书.txt", "产品名称:产品A") archive.seek(0) package = SimpleUploadedFile( "registration-package.zip", archive.read(), content_type="application/zip", ) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) documents = list(UploadedDocument.objects.filter(batch=batch).order_by("relative_path")) assert batch.file_count == 2 assert [document.relative_path for document in documents] == [ "CH1/注册申请表.txt", "CH1/目标产品说明书.txt", ] def test_import_submission_batch_supports_7z_package_and_preserves_relative_paths(db, monkeypatch, tmp_path): package = SimpleUploadedFile( "registration-package.7z", b"fake-7z-bytes", content_type="application/x-7z-compressed", ) class FakeSevenZipFile: def __init__(self, _file_obj, mode="r"): self.mode = mode def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def extractall(self, path): target = Path(path) (target / "CH1").mkdir(parents=True, exist_ok=True) (target / "CH1" / "注册申请表.txt").write_text("产品名称:产品A", encoding="utf-8") (target / "CH1" / "目标产品说明书.txt").write_text("产品名称:产品A", encoding="utf-8") fake_module = types.SimpleNamespace(SevenZipFile=FakeSevenZipFile) monkeypatch.setitem(sys.modules, "py7zr", fake_module) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) documents = list(UploadedDocument.objects.filter(batch=batch).order_by("relative_path")) assert batch.file_count == 2 assert [document.relative_path for document in documents] == [ "CH1/注册申请表.txt", "CH1/目标产品说明书.txt", ] def test_import_submission_batch_supports_rar_package_and_preserves_relative_paths(db, monkeypatch): package = SimpleUploadedFile( "registration-package.rar", b"fake-rar-bytes", content_type="application/vnd.rar", ) class FakeRarInfo: def __init__(self, filename, is_dir=False): self.filename = filename self._is_dir = is_dir def is_dir(self): return self._is_dir class FakeRarFile: def __init__(self, _file_obj): self.entries = { "CH1/注册申请表.txt": "产品名称:产品A".encode("utf-8"), "CH1/目标产品说明书.txt": "产品名称:产品A".encode("utf-8"), } def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def infolist(self): return [FakeRarInfo(name) for name in self.entries] def read(self, name): return self.entries[name] fake_module = types.SimpleNamespace(RarFile=FakeRarFile) monkeypatch.setitem(sys.modules, "rarfile", fake_module) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) documents = list(UploadedDocument.objects.filter(batch=batch).order_by("relative_path")) assert batch.file_count == 2 assert [document.relative_path for document in documents] == [ "CH1/注册申请表.txt", "CH1/目标产品说明书.txt", ] def test_import_submission_batch_records_warnings_for_unsupported_zip_entries(db): archive = BytesIO() with ZipFile(archive, "w") as zip_file: zip_file.writestr("CH1/注册申请表.txt", "产品名称:产品A") zip_file.writestr("CH1/忽略图片.png", b"binary-image-data") archive.seek(0) package = SimpleUploadedFile( "registration-package.zip", archive.read(), content_type="application/zip", ) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) warnings = result["registration_overview_report"]["warnings"] assert batch.file_count == 1 assert batch.exception_count == 1 assert "跳过不支持的文件" in warnings[0] assert "CH1/忽略图片.png" in warnings[0] def test_import_submission_batch_marks_failed_when_zip_has_no_supported_files(db): archive = BytesIO() with ZipFile(archive, "w") as zip_file: zip_file.writestr("assets/readme.png", b"binary-image-data") archive.seek(0) package = SimpleUploadedFile( "empty-registration-package.zip", archive.read(), content_type="application/zip", ) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) warnings = result["registration_overview_report"]["warnings"] assert batch.file_count == 0 assert batch.import_status == SubmissionBatch.STATUS_FAILED assert batch.exception_count == len(warnings) assert any("未发现可导入的支持文件" in warning for warning in warnings) def test_import_submission_batch_records_warnings_for_unsupported_7z_entries(db, monkeypatch): package = SimpleUploadedFile( "registration-package.7z", b"fake-7z-bytes", content_type="application/x-7z-compressed", ) class FakeSevenZipFile: def __init__(self, _file_obj, mode="r"): self.mode = mode def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def extractall(self, path): target = Path(path) (target / "CH1").mkdir(parents=True, exist_ok=True) (target / "CH1" / "注册申请表.txt").write_text("产品名称:产品A", encoding="utf-8") (target / "CH1" / "忽略图片.png").write_bytes(b"binary-image-data") fake_module = types.SimpleNamespace(SevenZipFile=FakeSevenZipFile) monkeypatch.setitem(sys.modules, "py7zr", fake_module) result = import_submission_batch("document_review", [package]) batch = SubmissionBatch.objects.get(batch_id=result["batch_id"]) warnings = result["registration_overview_report"]["warnings"] assert batch.file_count == 1 assert batch.exception_count == 1 assert any("CH1/忽略图片.png" in warning for warning in warnings) def test_create_export_record_persists_batch_conversation_and_file_metadata(db): from apps.documents.services import create_export_record batch = SubmissionBatch.objects.create( batch_id="SUB-20260604-010", product_name="产品X", workflow_type="registration", conversation_id="conv-010", file_count=2, page_count=12, import_status="completed", ) record = create_export_record( batch=batch, conversation_id="conv-010", product_name="产品X", template_name="注册证导出模板", template_version="V1.0", export_mode="draft", output_type="registration_word_export_report", file_name="SUB-20260604-010-draft.docx", relative_path="exports/20260604/SUB-20260604-010-draft.docx", download_url="/media/exports/20260604/SUB-20260604-010-draft.docx", ) assert ExportedDocument.objects.count() == 1 assert record.batch == batch assert record.conversation_id == "conv-010" assert record.product_name == "产品X" assert record.template_name == "注册证导出模板" assert record.export_mode == "draft" def test_document_list_shows_latest_export_record_for_batch(client, db): batch = SubmissionBatch.objects.create( batch_id="SUB-20260604-011", product_name="产品Y", workflow_type="registration", conversation_id="conv-011", file_count=2, page_count=12, import_status="completed", ) ExportedDocument.objects.create( batch=batch, conversation_id="conv-011", product_name="产品Y", template_name="注册证导出模板", template_version="V1.0", export_mode="draft", output_type="registration_word_export_report", file_name="SUB-20260604-011-draft.docx", relative_path="exports/20260604/SUB-20260604-011-draft.docx", download_url="/media/exports/20260604/SUB-20260604-011-draft.docx", ) response = client.get(reverse("documents:list")) content = response.content.decode("utf-8") assert response.status_code == 200 assert "最近导出" in content assert "SUB-20260604-011-draft.docx" in content