import pytest from review_agent.file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from review_agent.skill_router import SkillRoute from review_agent.models import ( Conversation, FileAttachment, FileSummaryBatch, FileSummaryBatchAttachment, Message, WorkflowEvent, WorkflowNodeRun, ) from review_agent.services import stream_message pytestmark = pytest.mark.django_db def test_create_batch_binds_active_attachments_and_initializes_nodes(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总") active = FileAttachment.objects.create( conversation=conversation, user=user, original_name="a.docx", storage_path="x/a.docx", file_size=1, ) FileAttachment.objects.create( conversation=conversation, user=user, original_name="old.docx", is_active=False, storage_path="x/old.docx", file_size=1, ) batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message) assert batch.status == FileSummaryBatch.Status.PENDING assert FileSummaryBatchAttachment.objects.get(batch=batch).attachment == active active.refresh_from_db() assert active.upload_status == FileAttachment.UploadStatus.BOUND assert WorkflowNodeRun.objects.filter(batch=batch).count() >= 6 assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_created").exists() def test_start_file_summary_workflow_runs_synchronously_for_tests(django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") message = Message.objects.create(conversation=conversation, role=Message.Role.USER, content="自动汇总") FileAttachment.objects.create( conversation=conversation, user=user, original_name="a.docx", storage_path="x/a.docx", file_size=1, ) batch = create_file_summary_batch(conversation=conversation, user=user, trigger_message=message) start_file_summary_workflow(batch, async_run=False) batch.refresh_from_db() assert batch.status == FileSummaryBatch.Status.SUCCESS assert WorkflowEvent.objects.filter(batch=batch, event_type="workflow_completed").exists() def test_stream_message_returns_workflow_meta_when_triggered(settings, django_user_model): settings.FILE_SUMMARY_ASYNC = False user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") FileAttachment.objects.create( conversation=conversation, user=user, original_name="a.docx", storage_path="x/a.docx", file_size=1, ) frames = list(stream_message(conversation, "请自动汇总文件目录与页数")) joined = "".join(frames) assert "workflow_started" in joined assert "\"workflow_type\": \"file_summary\"" in joined assert FileSummaryBatch.objects.filter(conversation=conversation).exists() def test_stream_message_uses_normal_llm_path_when_not_triggered(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") def fake_stream_reply(conversation, content): yield "普通回复" monkeypatch.setattr("review_agent.services.stream_reply", fake_stream_reply) frames = list(stream_message(conversation, "你好")) joined = "".join(frames) assert "普通回复" in joined assert "workflow_started" not in joined def test_stream_message_meta_uses_first_prompt_title_for_new_conversation(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="新对话 01-01 10:00") def fake_stream_reply(conversation, content): yield "普通回复" monkeypatch.setattr("review_agent.services.stream_reply", fake_stream_reply) frames = list(stream_message(conversation, "这是第一条新对话消息")) assert '"title": "这是第一条新对话消息"' in frames[0] assert '"title": "这是第一条新对话消息"' in frames[-1] def test_stream_message_reads_active_attachment_when_requested(settings, tmp_path, django_user_model): settings.MEDIA_ROOT = tmp_path user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") attachment_path = tmp_path / "uploads" / "detail.txt" attachment_path.parent.mkdir(parents=True) attachment_path.write_text("合同编号:RA-2026\n结论:附件阅读成功", encoding="utf-8") FileAttachment.objects.create( conversation=conversation, user=user, original_name="detail.txt", storage_path="uploads/detail.txt", file_size=attachment_path.stat().st_size, ) frames = list(stream_message(conversation, "请阅读附件并给出详情")) joined = "".join(frames) assert "附件解析结果" in joined assert "detail.txt" in joined assert "RA-2026" in joined assert "workflow_started" not in joined def test_stream_message_returns_error_event_when_unexpected_stream_error(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") def broken_stream_reply(conversation, content): yield "已生成部分内容" raise RuntimeError("provider connection reset") monkeypatch.setattr("review_agent.services.stream_reply", broken_stream_reply) frames = list(stream_message(conversation, "普通问题")) joined = "".join(frames) assert "已生成部分内容" in joined assert "回复生成中断" in joined assert "done" in joined assert Message.objects.filter(conversation=conversation, role=Message.Role.ASSISTANT).exists() def test_stream_message_uses_llm_router_for_attachment_reader( monkeypatch, settings, tmp_path, django_user_model, ): settings.MEDIA_ROOT = tmp_path user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") attachment_path = tmp_path / "uploads" / "resume.txt" attachment_path.parent.mkdir(parents=True) attachment_path.write_text("项目经历:负责审核智能体附件解析模块。", encoding="utf-8") FileAttachment.objects.create( conversation=conversation, user=user, original_name="resume.txt", storage_path="uploads/resume.txt", file_size=attachment_path.stat().st_size, ) monkeypatch.setattr( "review_agent.services.route_message_intent", lambda conversation, content: SkillRoute( action="attachment_reader", skill_name="attachment_reader", confidence=0.91, reason="需要读取上传简历。", source="llm", ), ) frames = list(stream_message(conversation, "帮我整理其中的项目经历")) joined = "".join(frames) assert "附件解析结果" in joined assert "审核智能体附件解析模块" in joined assert "模型调用失败" not in joined def test_stream_message_uses_llm_router_for_file_summary(monkeypatch, settings, django_user_model): settings.FILE_SUMMARY_ASYNC = False user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") FileAttachment.objects.create( conversation=conversation, user=user, original_name="a.docx", storage_path="x/a.docx", file_size=1, ) monkeypatch.setattr( "review_agent.services.route_message_intent", lambda conversation, content: SkillRoute( action="file_summary", workflow_type="file_summary", confidence=0.93, reason="需要执行文件目录与页数汇总。", source="llm", ), ) frames = list(stream_message(conversation, "处理一下这批资料")) joined = "".join(frames) assert "workflow_started" in joined assert "\"workflow_type\": \"file_summary\"" in joined assert FileSummaryBatch.objects.filter(conversation=conversation).exists()