from __future__ import annotations import json import logging from pathlib import Path from django.db.models import Q, QuerySet from django.conf import settings from django.utils import timezone from .file_summary.skills.attachment_reader import AttachmentReaderSkill from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow from .knowledge_base import search_knowledge_base from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply from .models import Conversation, FileAttachment, FileSummaryBatch, FileSummaryBatchAttachment, KnowledgeBaseDocument, Message from .regulatory_review.services.rag_index import extract_text_from_path from .application_form_fill.workflow import ( create_application_form_fill_batch, find_latest_successful_summary_batch as find_latest_successful_form_fill_summary_batch, start_application_form_fill_workflow, ) from .regulatory_review.workflow import ( create_regulatory_review_batch, find_latest_successful_summary_batch, start_regulatory_review_workflow, ) from .skill_router import route_message_intent logger = logging.getLogger(__name__) def list_conversations(user, search: str = "") -> QuerySet[Conversation]: """Returns a user's conversations, optionally filtered by title or content.""" conversations = Conversation.objects.filter(user=user) if not search: return conversations return conversations.filter( Q(title__icontains=search) | Q(messages__content__icontains=search) ).distinct() def get_conversation_for_user(user, conversation_id: int | None) -> Conversation | None: """Loads a conversation only when it belongs to the current user.""" if not conversation_id: return None return Conversation.objects.filter(user=user, pk=conversation_id).first() def create_conversation(user) -> Conversation: """Creates an empty conversation that can immediately accept messages.""" now = timezone.localtime() return Conversation.objects.create( user=user, title=f"新对话 {now.strftime('%m-%d %H:%M')}", ) def append_user_message(conversation: Conversation, content: str) -> Message: """Appends a user message and updates the conversation title from the first prompt.""" message = Message.objects.create( conversation=conversation, role=Message.Role.USER, content=content.strip(), ) logger.info( "User message appended", extra={ "conversation_id": conversation.pk, "message_id": message.pk, "content_length": len(message.content), }, ) if conversation.messages.filter(role=Message.Role.USER).count() == 1: conversation.title = build_conversation_title(content) conversation.save(update_fields=["title", "updated_at"]) return message def append_assistant_message(conversation: Conversation, content: str) -> Message: """Appends the deterministic assistant reply.""" message = Message.objects.create( conversation=conversation, role=Message.Role.ASSISTANT, content=content, ) logger.info( "Assistant message appended", extra={ "conversation_id": conversation.pk, "message_id": message.pk, "content_length": len(content or ""), }, ) return message def send_message(conversation: Conversation, content: str) -> tuple[Message, Message]: """Stores one user message and one provider-backed assistant reply.""" user_message = append_user_message(conversation, content) knowledge_context = build_knowledge_context(content) if should_refuse_ungrounded_chat(conversation, content, knowledge_context): reply_content = out_of_scope_reply() else: try: reply_content = generate_reply(conversation, content, knowledge_context=knowledge_context) except (LLMConfigurationError, LLMRequestError) as exc: reply_content = f"模型调用失败:{exc}" assistant_message = append_assistant_message(conversation, reply_content) if conversation.title.startswith("新对话"): conversation.title = build_conversation_title(content) conversation.save(update_fields=["title", "updated_at"]) return user_message, assistant_message def stream_message(conversation: Conversation, content: str): """Yields SSE events while collecting a streamed assistant reply.""" user_message = append_user_message(conversation, content) assistant_parts: list[str] = [] knowledge_context = build_knowledge_context(content) if should_refuse_ungrounded_chat(conversation, content, knowledge_context): reply_content = out_of_scope_reply() assistant_message = append_assistant_message(conversation, reply_content) yield sse_event( "meta", { "conversation_id": conversation.pk, "title": conversation.title or build_conversation_title(content), "user_message_id": user_message.pk, "user_message": user_message.content, }, ) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return route = route_message_intent(conversation, content) logger.info( "Stream message started", extra={ "conversation_id": conversation.pk, "user_message_id": user_message.pk, "route_action": route.action, "route_source": route.source, "route_confidence": route.confidence, "route_reason": route.reason, }, ) yield sse_event( "meta", { "conversation_id": conversation.pk, "title": conversation.title or build_conversation_title(content), "user_message_id": user_message.pk, "user_message": user_message.content, }, ) if route.starts_file_summary and not _has_active_attachments(conversation): reply_content = "请先在当前对话右侧上传需要汇总的文件或压缩包,然后再发送自动汇总指令。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return if route.uses_attachment_reader and not _has_active_attachments(conversation): reply_content = "请先在当前对话右侧上传需要阅读的附件,然后再发送解析或阅读附件指令。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return if route.uses_attachment_reader: attachments = _select_attachments_for_reader(conversation, content) logger.info( "Attachment reader path selected", extra={ "conversation_id": conversation.pk, "attachment_count": len(attachments), "attachment_ids": [attachment.pk for attachment in attachments], }, ) result = AttachmentReaderSkill().run_for_attachments(attachments) reply_content = _format_attachment_reader_reply(result.data.get("attachments", []), result.message) assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return if route.starts_file_summary: batch = create_file_summary_batch( conversation=conversation, user=conversation.user, trigger_message=user_message, ) start_file_summary_workflow( batch, async_run=getattr(settings, "FILE_SUMMARY_ASYNC", True), ) reply_content = f"已启动文件目录与页数自动汇总工作流,批次号:{batch.batch_no}。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event( "workflow_started", { "workflow_type": "file_summary", "batch_id": batch.pk, "batch_no": batch.batch_no, }, ) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return if route.starts_application_form_fill: source_summary_batch = find_latest_successful_form_fill_summary_batch(conversation) if source_summary_batch and not _summary_covers_active_attachments(conversation, source_summary_batch): source_summary_batch = None if not source_summary_batch: if not _has_active_attachments(conversation): reply_content = "请先在当前对话右侧上传需要填表的产品资料或压缩包,我会先自动汇总再继续生成申报模板。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return summary_batch = create_file_summary_batch( conversation=conversation, user=conversation.user, trigger_message=user_message, ) yield sse_event( "workflow_started", { "workflow_type": "file_summary", "batch_id": summary_batch.pk, "batch_no": summary_batch.batch_no, }, ) start_file_summary_workflow(summary_batch, async_run=False) summary_batch.refresh_from_db() if summary_batch.status != FileSummaryBatch.Status.SUCCESS: reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动申报文件自动填表。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return source_summary_batch = summary_batch reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续自动填表。\n" else: reply_prefix = "" batch = create_application_form_fill_batch( conversation=conversation, user=conversation.user, trigger_message=user_message, source_summary_batch=source_summary_batch, ) start_application_form_fill_workflow( batch, async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True), ) reply_content = f"{reply_prefix}已启动申报文件自动填表工作流,批次号:{batch.batch_no}。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event( "workflow_started", { "workflow_type": "application_form_fill", "batch_id": batch.pk, "batch_no": batch.batch_no, }, ) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return if route.starts_regulatory_review: source_summary_batch = find_latest_successful_summary_batch(conversation) if not source_summary_batch: if not _has_active_attachments(conversation): reply_content = "请先在当前对话右侧上传需要核查的文件或压缩包,我会先自动汇总再继续法规核查。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return summary_batch = create_file_summary_batch( conversation=conversation, user=conversation.user, trigger_message=user_message, ) yield sse_event( "workflow_started", { "workflow_type": "file_summary", "batch_id": summary_batch.pk, "batch_no": summary_batch.batch_no, }, ) start_file_summary_workflow(summary_batch, async_run=False) summary_batch.refresh_from_db() if summary_batch.status != FileSummaryBatch.Status.SUCCESS: reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动法规核查。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return source_summary_batch = summary_batch reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续法规核查。\n" else: reply_prefix = "" batch = create_regulatory_review_batch( conversation=conversation, user=conversation.user, trigger_message=user_message, source_summary_batch=source_summary_batch, ) start_regulatory_review_workflow( batch, async_run=getattr(settings, "REGULATORY_REVIEW_ASYNC", True), ) reply_content = f"{reply_prefix}已启动 NMPA 注册资料法规核查工作流,批次号:{batch.batch_no}。" assistant_message = append_assistant_message(conversation, reply_content) yield sse_event( "workflow_started", { "workflow_type": "regulatory_review", "batch_id": batch.pk, "batch_no": batch.batch_no, }, ) yield sse_event("chunk", {"delta": reply_content}) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) return stream_failed = False stream_error = "" try: for chunk in stream_reply(conversation, content, knowledge_context=knowledge_context): assistant_parts.append(chunk) yield sse_event("chunk", {"delta": chunk}) except (LLMConfigurationError, LLMRequestError) as exc: stream_failed = True stream_error = str(exc) logger.warning( "LLM stream failed", extra={"conversation_id": conversation.pk, "error": str(exc)}, ) except Exception as exc: stream_failed = True stream_error = str(exc) logger.exception( "Unexpected stream failure", extra={"conversation_id": conversation.pk, "error": str(exc)}, ) if stream_failed: try: fallback_reply = generate_reply(conversation, content, knowledge_context=knowledge_context) assistant_parts = [fallback_reply] logger.info( "Non-stream fallback reply succeeded", extra={"conversation_id": conversation.pk, "content_length": len(fallback_reply)}, ) yield sse_event("replace", {"content": fallback_reply}) except (LLMConfigurationError, LLMRequestError) as exc: fallback = f"模型调用失败:{exc}" assistant_parts = [fallback] logger.warning( "Non-stream fallback reply failed", extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error}, ) yield sse_event("error", {"message": fallback}) except Exception as exc: fallback = f"回复生成中断:{stream_error or exc}" assistant_parts.append("\n\n" + fallback) logger.exception( "Non-stream fallback crashed", extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error}, ) yield sse_event("error", {"message": fallback}) assistant_message = append_assistant_message(conversation, "".join(assistant_parts).strip()) if conversation.title.startswith("新对话"): conversation.title = build_conversation_title(content) conversation.save(update_fields=["title", "updated_at"]) yield sse_event( "done", { "assistant_message_id": assistant_message.pk, "conversation_id": conversation.pk, "title": conversation.title, }, ) def build_conversation_title(content: str) -> str: """Creates a concise title from the first user message.""" normalized = " ".join(content.strip().split()) if not normalized: return "新对话" return normalized[:24] def build_knowledge_context(content: str, *, n_results: int = 5) -> str: """Formats global knowledge-base search hits for normal chat prompts.""" full_document_context = build_filename_matched_document_context(content) if full_document_context: return full_document_context try: payload = search_knowledge_base(content, n_results=n_results) except Exception as exc: logger.warning("Knowledge-base search failed", extra={"error": str(exc)}) return "" if payload.get("error_message"): return "" results = [ item for item in _rank_knowledge_results(content, payload.get("results") or []) if _is_relevant_knowledge_result(content, item) ] lines: list[str] = [] for index, item in enumerate(results[:n_results], start=1): text = " ".join(str(item.get("text") or "").split()) if not text: continue source = str(item.get("source") or "未知来源") score = item.get("score") score_label = f",score={score:.4f}" if isinstance(score, (int, float)) else "" lines.append(f"[{index}] 来源:{source}{score_label}\n{text[:1200]}") return "\n\n".join(lines) def should_refuse_ungrounded_chat( conversation: Conversation, content: str, knowledge_context: str = "", ) -> bool: if (knowledge_context or "").strip(): return False if _is_business_related_question(content): return False if _has_active_attachments(conversation): return False return True def out_of_scope_reply() -> str: return ( "没有在当前启用的知识库材料中找到可依据的内容,且这个问题与当前主营业务无关。" "为避免编造,我不能直接回答。请先上传或启用相关知识库材料,或改问体外诊断试剂注册资料审核、" "文件汇总、法规核查、申报填表等业务范围内的问题。" ) def _is_business_related_question(content: str) -> bool: normalized = (content or "").lower() compact = "".join(normalized.split()) if not compact: return True business_keywords = [ "审核智能体", "体外诊断", "ivd", "nmpa", "cmde", "医疗器械", "注册资料", "注册申报", "注册检验", "注册证", "申报资料", "申报文件", "法规", "核查", "审评", "审核", "整改", "风险", "说明书", "临床", "性能", "安全", "适用范围", "预期用途", "附件", "文件", "压缩包", "目录", "页数", "清单", "汇总", "模板", "填表", "知识库", "检索", "报告", "材料", "资料", ] return any(keyword in compact for keyword in business_keywords) def build_filename_matched_document_context(query: str, *, max_chars: int = 12000) -> str: terms = _knowledge_query_terms(query) if not terms: return "" matches = [] for document in KnowledgeBaseDocument.objects.filter( status=KnowledgeBaseDocument.Status.ACTIVE, is_active=True, ).order_by("-updated_at", "-id"): filename = f"{document.display_name} {document.original_name}" if any(term and term in filename for term in terms): matches.append(document) if not matches: return "" lines = [ "以下材料因用户问题中的关键词命中文档名称,已读取全文供回答前比对和总结。" ] for index, document in enumerate(matches[:3], start=1): text = _extract_managed_document_text(document) if not text: continue lines.append( f"[全文材料 {index}] 来源:用户知识库/{document.original_name}\n" f"{' '.join(text.split())[:max_chars]}" ) return "\n\n".join(lines).strip() def _extract_managed_document_text(document: KnowledgeBaseDocument) -> str: try: return extract_text_from_path(Path(document.storage_path)) except Exception as exc: logger.warning( "Managed document full-text extraction failed", extra={"document_id": document.pk, "error": str(exc)}, ) return "" def _rank_knowledge_results(query: str, results: list[dict[str, object]]) -> list[dict[str, object]]: terms = [term for term in _knowledge_query_terms(query) if term] def sort_key(item: dict[str, object]) -> tuple[int, float]: source = str(item.get("source") or "") text = str(item.get("text") or "") haystack = f"{source}\n{text}" direct_hit = any(term in haystack for term in terms) score = item.get("score") numeric_score = float(score) if isinstance(score, (int, float)) else 999999.0 return (0 if direct_hit else 1, numeric_score) return sorted(results, key=sort_key) def _is_relevant_knowledge_result(query: str, item: dict[str, object]) -> bool: terms = _knowledge_query_terms(query) if not terms: return False source = str(item.get("source") or "") text = str(item.get("text") or "") haystack = f"{source}\n{text}" if any(term in haystack for term in terms): return True metadata = item.get("metadata") or {} if metadata.get("source_type") == "managed_document": return True return False def _knowledge_query_terms(query: str) -> list[str]: normalized = "".join((query or "").split()) if not normalized: return [] stop_chars = set("是谁什么哪里如何怎么请问一下帮我你能告诉吗??,,。.") compact = "".join(char for char in normalized if char not in stop_chars) terms = [compact] if compact else [] if normalized not in terms: terms.append(normalized) return terms def _select_attachments_for_reader(conversation: Conversation, content: str): attachments = list( FileAttachment.objects.filter( conversation=conversation, is_active=True, ) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .order_by("original_name", "-version_no") ) matched = [attachment for attachment in attachments if attachment.original_name in content] return matched or attachments def _has_active_attachments(conversation: Conversation) -> bool: return ( FileAttachment.objects.filter(conversation=conversation, is_active=True) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .exists() ) def _summary_covers_active_attachments(conversation: Conversation, batch: FileSummaryBatch) -> bool: active_ids = set( FileAttachment.objects.filter(conversation=conversation, is_active=True) .exclude(upload_status=FileAttachment.UploadStatus.DELETED) .values_list("id", flat=True) ) if not active_ids: return True bound_ids = set( FileSummaryBatchAttachment.objects.filter(batch=batch).values_list("attachment_id", flat=True) ) return active_ids.issubset(bound_ids) def _format_attachment_reader_reply(attachments: list[dict[str, object]], message: str) -> str: if not attachments: return message or "当前对话没有可读取的附件。" lines = ["## 附件解析结果"] for item in attachments: status = item.get("status", "") filename = item.get("filename", "") file_type = item.get("file_type", "") lines.extend( [ "", f"### {filename}", f"- 类型:{file_type or '未知'}", f"- 状态:{status}", ] ) if item.get("error_message"): lines.append(f"- 错误:{item['error_message']}") continue preview = str(item.get("preview_text") or "").strip() if preview: lines.extend(["", "摘要预览:", "```text", preview, "```"]) sections = item.get("sections") or [] if sections: lines.append("") lines.append("结构详情:") for section in sections[:8]: if not isinstance(section, dict): continue section_type = section.get("type", "section") name = section.get("name", "") extra = "" if "row_count" in section: extra = f",{section['row_count']} 行" if "column_count" in section: extra += f",{section['column_count']} 列" lines.append(f"- {name}({section_type}{extra})") return "\n".join(lines).strip() def sse_event(event_name: str, payload: dict[str, object]) -> str: """Formats one server-sent event frame.""" return f"event: {event_name}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"