Files
DEMO-AGENT/review_agent/services.py

816 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import logging
from pathlib import Path
from django.db.models import Q, QuerySet
from django.conf import settings
from django.utils import timezone
from .file_summary.skills.attachment_reader import AttachmentReaderSkill
from .file_summary.workflow import create_file_summary_batch, start_file_summary_workflow
from .knowledge_base import search_knowledge_base
from .llm import LLMConfigurationError, LLMRequestError, generate_reply, stream_reply
from .models import Conversation, FileAttachment, FileSummaryBatch, FileSummaryBatchAttachment, KnowledgeBaseDocument, Message
from .regulatory_review.services.rag_index import extract_text_from_path
from .application_form_fill.workflow import (
create_application_form_fill_batch,
find_latest_successful_summary_batch as find_latest_successful_form_fill_summary_batch,
start_application_form_fill_workflow,
)
from .regulatory_info_package.constants import WORKFLOW_TYPE as REGULATORY_INFO_PACKAGE_WORKFLOW_TYPE
from .regulatory_info_package.services.input_select import select_instruction_input
from .regulatory_info_package.workflow import (
create_regulatory_info_package_batch,
start_regulatory_info_package_workflow,
)
from .regulatory_review.workflow import (
create_regulatory_review_batch,
find_latest_successful_summary_batch,
start_regulatory_review_workflow,
)
from .skill_router import route_message_intent
logger = logging.getLogger(__name__)
def list_conversations(user, search: str = "") -> QuerySet[Conversation]:
"""Returns a user's conversations, optionally filtered by title or content."""
conversations = Conversation.objects.filter(user=user)
if not search:
return conversations
return conversations.filter(
Q(title__icontains=search) | Q(messages__content__icontains=search)
).distinct()
def get_conversation_for_user(user, conversation_id: int | None) -> Conversation | None:
"""Loads a conversation only when it belongs to the current user."""
if not conversation_id:
return None
return Conversation.objects.filter(user=user, pk=conversation_id).first()
def create_conversation(user) -> Conversation:
"""Creates an empty conversation that can immediately accept messages."""
now = timezone.localtime()
return Conversation.objects.create(
user=user,
title=f"新对话 {now.strftime('%m-%d %H:%M')}",
)
def append_user_message(conversation: Conversation, content: str) -> Message:
"""Appends a user message and updates the conversation title from the first prompt."""
message = Message.objects.create(
conversation=conversation,
role=Message.Role.USER,
content=content.strip(),
)
logger.info(
"User message appended",
extra={
"conversation_id": conversation.pk,
"message_id": message.pk,
"content_length": len(message.content),
},
)
if conversation.messages.filter(role=Message.Role.USER).count() == 1:
conversation.title = build_conversation_title(content)
conversation.save(update_fields=["title", "updated_at"])
return message
def append_assistant_message(conversation: Conversation, content: str) -> Message:
"""Appends the deterministic assistant reply."""
message = Message.objects.create(
conversation=conversation,
role=Message.Role.ASSISTANT,
content=content,
)
logger.info(
"Assistant message appended",
extra={
"conversation_id": conversation.pk,
"message_id": message.pk,
"content_length": len(content or ""),
},
)
return message
def send_message(conversation: Conversation, content: str) -> tuple[Message, Message]:
"""Stores one user message and one provider-backed assistant reply."""
user_message = append_user_message(conversation, content)
knowledge_context = build_knowledge_context(content)
if should_refuse_ungrounded_chat(conversation, content, knowledge_context):
reply_content = out_of_scope_reply()
else:
try:
reply_content = generate_reply(conversation, content, knowledge_context=knowledge_context)
except (LLMConfigurationError, LLMRequestError) as exc:
reply_content = f"模型调用失败:{exc}"
assistant_message = append_assistant_message(conversation, reply_content)
if conversation.title.startswith("新对话"):
conversation.title = build_conversation_title(content)
conversation.save(update_fields=["title", "updated_at"])
return user_message, assistant_message
def stream_message(conversation: Conversation, content: str):
"""Yields SSE events while collecting a streamed assistant reply."""
user_message = append_user_message(conversation, content)
assistant_parts: list[str] = []
knowledge_context = build_knowledge_context(content)
if should_refuse_ungrounded_chat(conversation, content, knowledge_context):
reply_content = out_of_scope_reply()
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"meta",
{
"conversation_id": conversation.pk,
"title": conversation.title or build_conversation_title(content),
"user_message_id": user_message.pk,
"user_message": user_message.content,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
route = route_message_intent(conversation, content)
logger.info(
"Stream message started",
extra={
"conversation_id": conversation.pk,
"user_message_id": user_message.pk,
"route_action": route.action,
"route_source": route.source,
"route_confidence": route.confidence,
"route_reason": route.reason,
},
)
yield sse_event(
"meta",
{
"conversation_id": conversation.pk,
"title": conversation.title or build_conversation_title(content),
"user_message_id": user_message.pk,
"user_message": user_message.content,
},
)
if route.starts_file_summary and not _has_active_attachments(conversation):
reply_content = "请先在当前对话右侧上传需要汇总的文件或压缩包,然后再发送自动汇总指令。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.uses_attachment_reader and not _has_active_attachments(conversation):
reply_content = "请先在当前对话右侧上传需要阅读的附件,然后再发送解析或阅读附件指令。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.uses_attachment_reader:
attachments = _select_attachments_for_reader(conversation, content)
logger.info(
"Attachment reader path selected",
extra={
"conversation_id": conversation.pk,
"attachment_count": len(attachments),
"attachment_ids": [attachment.pk for attachment in attachments],
},
)
result = AttachmentReaderSkill().run_for_attachments(attachments)
reply_content = _format_attachment_reader_reply(result.data.get("attachments", []), result.message)
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.starts_file_summary:
batch = create_file_summary_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
)
start_file_summary_workflow(
batch,
async_run=getattr(settings, "FILE_SUMMARY_ASYNC", True),
)
reply_content = f"已启动文件目录与页数自动汇总工作流,批次号:{batch.batch_no}"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"workflow_started",
{
"workflow_type": "file_summary",
"batch_id": batch.pk,
"batch_no": batch.batch_no,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.starts_application_form_fill:
source_summary_batch = find_latest_successful_form_fill_summary_batch(conversation)
if source_summary_batch and not _summary_covers_active_attachments(conversation, source_summary_batch):
source_summary_batch = None
if not source_summary_batch:
if not _has_active_attachments(conversation):
reply_content = "请先在当前对话右侧上传需要填表的产品资料或压缩包,我会先自动汇总再继续生成申报模板。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
summary_batch = create_file_summary_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
)
yield sse_event(
"workflow_started",
{
"workflow_type": "file_summary",
"batch_id": summary_batch.pk,
"batch_no": summary_batch.batch_no,
},
)
start_file_summary_workflow(summary_batch, async_run=False)
summary_batch.refresh_from_db()
if summary_batch.status != FileSummaryBatch.Status.SUCCESS:
reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动申报文件自动填表。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
source_summary_batch = summary_batch
reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续自动填表。\n"
else:
reply_prefix = ""
batch = create_application_form_fill_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
source_summary_batch=source_summary_batch,
)
start_application_form_fill_workflow(
batch,
async_run=getattr(settings, "APPLICATION_FORM_FILL_ASYNC", True),
)
reply_content = f"{reply_prefix}已启动申报文件自动填表工作流,批次号:{batch.batch_no}"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"workflow_started",
{
"workflow_type": "application_form_fill",
"batch_id": batch.pk,
"batch_no": batch.batch_no,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.starts_regulatory_info_package:
selection = select_instruction_input(conversation, content)
if selection.status != "selected":
reply_content = selection.message or "请先在当前对话右侧上传产品说明书 docx 文件然后再发送第1章监管信息生成指令。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
batch = create_regulatory_info_package_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
source_attachment=selection.attachment,
source_summary_batch=selection.source_summary_batch,
source_summary_item_id=selection.source_summary_item_id,
source_file_name=selection.file_name,
source_storage_path=selection.storage_path,
)
start_regulatory_info_package_workflow(
batch,
async_run=getattr(settings, "REGULATORY_INFO_PACKAGE_ASYNC", True),
)
reply_content = f"已启动第1章监管信息材料包生成工作流批次号{batch.batch_no}"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"workflow_started",
{
"workflow_type": REGULATORY_INFO_PACKAGE_WORKFLOW_TYPE,
"batch_id": batch.pk,
"batch_no": batch.batch_no,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
if route.starts_regulatory_review:
source_summary_batch = find_latest_successful_summary_batch(conversation)
if not source_summary_batch:
if not _has_active_attachments(conversation):
reply_content = "请先在当前对话右侧上传需要核查的文件或压缩包,我会先自动汇总再继续法规核查。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
summary_batch = create_file_summary_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
)
yield sse_event(
"workflow_started",
{
"workflow_type": "file_summary",
"batch_id": summary_batch.pk,
"batch_no": summary_batch.batch_no,
},
)
start_file_summary_workflow(summary_batch, async_run=False)
summary_batch.refresh_from_db()
if summary_batch.status != FileSummaryBatch.Status.SUCCESS:
reply_content = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},但汇总未成功:{summary_batch.error_message or '原因待查看'}。请处理后再启动法规核查。"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
source_summary_batch = summary_batch
reply_prefix = f"已先启动文件目录与页数自动汇总工作流,批次号:{summary_batch.batch_no},汇总完成后继续法规核查。\n"
else:
reply_prefix = ""
batch = create_regulatory_review_batch(
conversation=conversation,
user=conversation.user,
trigger_message=user_message,
source_summary_batch=source_summary_batch,
)
start_regulatory_review_workflow(
batch,
async_run=getattr(settings, "REGULATORY_REVIEW_ASYNC", True),
)
reply_content = f"{reply_prefix}已启动 NMPA 注册资料法规核查工作流,批次号:{batch.batch_no}"
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"workflow_started",
{
"workflow_type": "regulatory_review",
"batch_id": batch.pk,
"batch_no": batch.batch_no,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
stream_failed = False
stream_error = ""
try:
for chunk in stream_reply(conversation, content, knowledge_context=knowledge_context):
assistant_parts.append(chunk)
yield sse_event("chunk", {"delta": chunk})
except (LLMConfigurationError, LLMRequestError) as exc:
stream_failed = True
stream_error = str(exc)
logger.warning(
"LLM stream failed",
extra={"conversation_id": conversation.pk, "error": str(exc)},
)
except Exception as exc:
stream_failed = True
stream_error = str(exc)
logger.exception(
"Unexpected stream failure",
extra={"conversation_id": conversation.pk, "error": str(exc)},
)
if stream_failed:
try:
fallback_reply = generate_reply(conversation, content, knowledge_context=knowledge_context)
assistant_parts = [fallback_reply]
logger.info(
"Non-stream fallback reply succeeded",
extra={"conversation_id": conversation.pk, "content_length": len(fallback_reply)},
)
yield sse_event("replace", {"content": fallback_reply})
except (LLMConfigurationError, LLMRequestError) as exc:
fallback = f"模型调用失败:{exc}"
assistant_parts = [fallback]
logger.warning(
"Non-stream fallback reply failed",
extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error},
)
yield sse_event("error", {"message": fallback})
except Exception as exc:
fallback = f"回复生成中断:{stream_error or exc}"
assistant_parts.append("\n\n" + fallback)
logger.exception(
"Non-stream fallback crashed",
extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error},
)
yield sse_event("error", {"message": fallback})
assistant_message = append_assistant_message(conversation, "".join(assistant_parts).strip())
if conversation.title.startswith("新对话"):
conversation.title = build_conversation_title(content)
conversation.save(update_fields=["title", "updated_at"])
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
def build_conversation_title(content: str) -> str:
"""Creates a concise title from the first user message."""
normalized = " ".join(content.strip().split())
if not normalized:
return "新对话"
return normalized[:24]
def build_knowledge_context(content: str, *, n_results: int = 5) -> str:
"""Formats global knowledge-base search hits for normal chat prompts."""
full_document_context = build_filename_matched_document_context(content)
if full_document_context:
return full_document_context
try:
payload = search_knowledge_base(content, n_results=n_results)
except Exception as exc:
logger.warning("Knowledge-base search failed", extra={"error": str(exc)})
return ""
if payload.get("error_message"):
return ""
results = [
item
for item in _rank_knowledge_results(content, payload.get("results") or [])
if _is_relevant_knowledge_result(content, item)
]
lines: list[str] = []
for index, item in enumerate(results[:n_results], start=1):
text = " ".join(str(item.get("text") or "").split())
if not text:
continue
source = str(item.get("source") or "未知来源")
score = item.get("score")
score_label = f"score={score:.4f}" if isinstance(score, (int, float)) else ""
lines.append(f"[{index}] 来源:{source}{score_label}\n{text[:1200]}")
return "\n\n".join(lines)
def should_refuse_ungrounded_chat(
conversation: Conversation,
content: str,
knowledge_context: str = "",
) -> bool:
if (knowledge_context or "").strip():
return False
if _is_business_related_question(content):
return False
if _has_active_attachments(conversation):
return False
return True
def out_of_scope_reply() -> str:
return (
"没有在当前启用的知识库材料中找到可依据的内容,且这个问题与当前主营业务无关。"
"为避免编造,我不能直接回答。请先上传或启用相关知识库材料,或改问体外诊断试剂注册资料审核、"
"文件汇总、法规核查、申报填表等业务范围内的问题。"
)
def _is_business_related_question(content: str) -> bool:
normalized = (content or "").lower()
compact = "".join(normalized.split())
if not compact:
return True
business_keywords = [
"审核智能体",
"体外诊断",
"ivd",
"nmpa",
"cmde",
"医疗器械",
"注册资料",
"注册申报",
"注册检验",
"注册证",
"申报资料",
"申报文件",
"法规",
"核查",
"审评",
"审核",
"整改",
"风险",
"说明书",
"临床",
"性能",
"安全",
"适用范围",
"预期用途",
"附件",
"文件",
"压缩包",
"目录",
"页数",
"清单",
"汇总",
"模板",
"填表",
"知识库",
"检索",
"报告",
"材料",
"资料",
]
return any(keyword in compact for keyword in business_keywords)
def build_filename_matched_document_context(query: str, *, max_chars: int = 12000) -> str:
terms = _knowledge_query_terms(query)
if not terms:
return ""
matches = []
for document in KnowledgeBaseDocument.objects.filter(
status=KnowledgeBaseDocument.Status.ACTIVE,
is_active=True,
).order_by("-updated_at", "-id"):
filename = f"{document.display_name} {document.original_name}"
if any(term and term in filename for term in terms):
matches.append(document)
if not matches:
return ""
lines = [
"以下材料因用户问题中的关键词命中文档名称,已读取全文供回答前比对和总结。"
]
for index, document in enumerate(matches[:3], start=1):
text = _extract_managed_document_text(document)
if not text:
continue
lines.append(
f"[全文材料 {index}] 来源:用户知识库/{document.original_name}\n"
f"{' '.join(text.split())[:max_chars]}"
)
return "\n\n".join(lines).strip()
def _extract_managed_document_text(document: KnowledgeBaseDocument) -> str:
try:
return extract_text_from_path(Path(document.storage_path))
except Exception as exc:
logger.warning(
"Managed document full-text extraction failed",
extra={"document_id": document.pk, "error": str(exc)},
)
return ""
def _rank_knowledge_results(query: str, results: list[dict[str, object]]) -> list[dict[str, object]]:
terms = [term for term in _knowledge_query_terms(query) if term]
def sort_key(item: dict[str, object]) -> tuple[int, float]:
source = str(item.get("source") or "")
text = str(item.get("text") or "")
haystack = f"{source}\n{text}"
direct_hit = any(term in haystack for term in terms)
score = item.get("score")
numeric_score = float(score) if isinstance(score, (int, float)) else 999999.0
return (0 if direct_hit else 1, numeric_score)
return sorted(results, key=sort_key)
def _is_relevant_knowledge_result(query: str, item: dict[str, object]) -> bool:
terms = _knowledge_query_terms(query)
if not terms:
return False
source = str(item.get("source") or "")
text = str(item.get("text") or "")
haystack = f"{source}\n{text}"
if any(term in haystack for term in terms):
return True
metadata = item.get("metadata") or {}
if metadata.get("source_type") == "managed_document":
return True
return False
def _knowledge_query_terms(query: str) -> list[str]:
normalized = "".join((query or "").split())
if not normalized:
return []
stop_chars = set("是谁什么哪里如何怎么请问一下帮我你能告诉吗??,。.")
compact = "".join(char for char in normalized if char not in stop_chars)
terms = [compact] if compact else []
if normalized not in terms:
terms.append(normalized)
return terms
def _select_attachments_for_reader(conversation: Conversation, content: str):
attachments = list(
FileAttachment.objects.filter(
conversation=conversation,
is_active=True,
)
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
.order_by("original_name", "-version_no")
)
matched = [attachment for attachment in attachments if attachment.original_name in content]
return matched or attachments
def _has_active_attachments(conversation: Conversation) -> bool:
return (
FileAttachment.objects.filter(conversation=conversation, is_active=True)
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
.exists()
)
def _summary_covers_active_attachments(conversation: Conversation, batch: FileSummaryBatch) -> bool:
active_ids = set(
FileAttachment.objects.filter(conversation=conversation, is_active=True)
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
.values_list("id", flat=True)
)
if not active_ids:
return True
bound_ids = set(
FileSummaryBatchAttachment.objects.filter(batch=batch).values_list("attachment_id", flat=True)
)
return active_ids.issubset(bound_ids)
def _format_attachment_reader_reply(attachments: list[dict[str, object]], message: str) -> str:
if not attachments:
return message or "当前对话没有可读取的附件。"
lines = ["## 附件解析结果"]
for item in attachments:
status = item.get("status", "")
filename = item.get("filename", "")
file_type = item.get("file_type", "")
lines.extend(
[
"",
f"### {filename}",
f"- 类型:{file_type or '未知'}",
f"- 状态:{status}",
]
)
if item.get("error_message"):
lines.append(f"- 错误:{item['error_message']}")
continue
preview = str(item.get("preview_text") or "").strip()
if preview:
lines.extend(["", "摘要预览:", "```text", preview, "```"])
sections = item.get("sections") or []
if sections:
lines.append("")
lines.append("结构详情:")
for section in sections[:8]:
if not isinstance(section, dict):
continue
section_type = section.get("type", "section")
name = section.get("name", "")
extra = ""
if "row_count" in section:
extra = f"{section['row_count']}"
if "column_count" in section:
extra += f"{section['column_count']}"
lines.append(f"- {name}{section_type}{extra}")
return "\n".join(lines).strip()
def sse_event(event_name: str, payload: dict[str, object]) -> str:
"""Formats one server-sent event frame."""
return f"event: {event_name}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"