from __future__ import annotations from dataclasses import dataclass, field from pathlib import Path from review_agent.models import Conversation, FileAttachment, FileSummaryBatch, FileSummaryItem @dataclass class InstructionInputSelection: status: str file_name: str = "" storage_path: str = "" attachment: FileAttachment | None = None source_summary_batch: FileSummaryBatch | None = None source_summary_item_id: int | None = None candidates: list[str] = field(default_factory=list) message: str = "" def select_instruction_input(conversation: Conversation, message: str) -> InstructionInputSelection: candidates = _active_docx_attachments(conversation) named = _match_by_message(candidates, message) if len(named) == 1: return _selection_from_attachment(named[0]) instruction_candidates = [item for item in candidates if "说明书" in item.original_name] if len(instruction_candidates) == 1: return _selection_from_attachment(instruction_candidates[0]) if len(candidates) == 1: return _selection_from_attachment(candidates[0]) if len(instruction_candidates) > 1 or len(candidates) > 1: names = [item.original_name for item in (instruction_candidates or candidates)] return InstructionInputSelection( status="waiting_user", candidates=names, message="请确认用于生成第1章监管信息的说明书文件名:" + "、".join(names), ) summary_selection = _select_from_latest_summary(conversation, message) if summary_selection: return summary_selection return InstructionInputSelection(status="missing", message="请先上传产品说明书 docx 文件。") def _active_docx_attachments(conversation: Conversation) -> list[FileAttachment]: return list( FileAttachment.objects.filter( conversation=conversation, is_active=True, ) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .filter(original_name__iendswith=".docx") .order_by("original_name", "-version_no") ) def _match_by_message(candidates: list[FileAttachment], message: str) -> list[FileAttachment]: compact = "".join((message or "").lower().split()) matched = [] for attachment in candidates: stem = Path(attachment.original_name).stem.lower() name = attachment.original_name.lower() if stem and stem in compact or name and name in compact: matched.append(attachment) return matched def _selection_from_attachment(attachment: FileAttachment) -> InstructionInputSelection: return InstructionInputSelection( status="selected", file_name=attachment.original_name, storage_path=attachment.storage_path, attachment=attachment, ) def _select_from_latest_summary(conversation: Conversation, message: str) -> InstructionInputSelection | None: batch = ( FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS) .order_by("-finished_at", "-created_at", "-id") .first() ) if not batch: return None items = list(batch.items.filter(file_name__iendswith=".docx").order_by("file_name", "id")) compact = "".join((message or "").lower().split()) named = [item for item in items if Path(item.file_name).stem.lower() in compact or item.file_name.lower() in compact] candidates = named or [item for item in items if "说明书" in item.file_name] if len(candidates) == 1: item = candidates[0] return InstructionInputSelection( status="selected", file_name=item.file_name, storage_path=item.storage_path, source_summary_batch=batch, source_summary_item_id=item.pk, ) if len(candidates) > 1: return InstructionInputSelection( status="waiting_user", source_summary_batch=batch, candidates=[item.file_name for item in candidates], message="请确认用于生成第1章监管信息的说明书文件名:" + "、".join(item.file_name for item in candidates), ) return None