from collections.abc import Callable from django.utils import timezone from agent_core.orchestrator import run_agent from agent_core.results import AgentResult from apps.audit.services import create_audit_log, create_notification_record from apps.scenarios.services import get_scenario from .models import Conversation def create_conversation_for_batch(batch_id: str, product_name: str) -> Conversation: """ 为资料包创建主会话。 会话标题固定优先使用解析出的产品名称, 缺失时回退到批次号,确保前台始终有稳定标题。 """ conversation = Conversation.objects.create( conversation_id=_generate_conversation_id(), title=product_name or f"未命名资料包-{batch_id}", product_name=product_name, batch_id=batch_id, task_status=Conversation.STATUS_PENDING, node_results=_build_initial_node_results(), ) return conversation def create_knowledge_conversation() -> Conversation: """ 创建未绑定资料包的知识库问答会话。 该会话用于用户尚未上传资料时直接向 RAG 知识库提问, 因此 batch_id 与 product_name 保持为空,Agent Core 通过空范围执行全局检索。 """ return Conversation.objects.create( conversation_id=_generate_conversation_id(), title="知识库问答会话", product_name="", batch_id="", task_status=Conversation.STATUS_PENDING, node_results=_build_knowledge_node_results(), ) def execute_conversation_agent( *, conversation: Conversation, message: str, document_ids: list[int], detail_url_builder: Callable[[int], str] | None = None, ) -> tuple[AgentResult, object]: """ 在服务层串起会话执行、审计留痕与通知落库。 View 只负责收集请求参数和渲染结果,不直接承载 Agent Core 编排。 """ scenario = get_scenario("document_review") try: result = run_agent( scenario, message, options={ "conversation_id": conversation.conversation_id, "batch_id": conversation.batch_id, "product_name": conversation.product_name, "document_ids": document_ids, }, ) except Exception as exc: result = AgentResult(status="failed", error=str(exc), answer="") audit_log = create_audit_log( "document_review", "注册审核智能体", message, result, batch_id=conversation.batch_id, conversation_id=conversation.conversation_id, product_name=conversation.product_name, ) _apply_agent_result_to_conversation(conversation, result) detail_url = detail_url_builder(audit_log.id) if detail_url_builder else "" _persist_notification_records(result, web_detail_url=detail_url) return result, audit_log def execute_conversation_export(*, batch, conversation: Conversation) -> dict: """ 在服务层串起 Word 导出、会话摘要更新和审计留痕。 View 只负责提示成功/失败消息,不直接承载导出编排细节。 """ from .export_service import ( generate_registration_export, update_conversation_with_export_report, ) upstream_summary = ( (conversation.latest_summary or {}).get("upstream_structured_output") or (conversation.latest_summary or {}).get("structured_output") or {} ) export_report = generate_registration_export( batch=batch, conversation=conversation, upstream_summary=upstream_summary, ) update_conversation_with_export_report(conversation, export_report) audit_log = create_audit_log( "document_review", "Word 回填导出", "生成 Word 导出文件", AgentResult( answer=export_report.get("summary", ""), structured_output=export_report, status="success", conversation_id=conversation.conversation_id, batch_id=conversation.batch_id, product_name=conversation.product_name, node_results=conversation.node_results, ), batch_id=conversation.batch_id, conversation_id=conversation.conversation_id, product_name=conversation.product_name, ) return { "export_report": export_report, "audit_log": audit_log, } def _generate_conversation_id() -> str: return f"conv-{Conversation.objects.count() + 1:03d}" def _build_initial_node_results() -> list[dict]: return [ {"code": "package_import", "label": "资料包导入", "status": "已完成"}, {"code": "overview", "label": "目录汇总", "status": "处理中"}, {"code": "completeness", "label": "法规完整性检查", "status": "待处理"}, {"code": "field_extraction", "label": "字段抽取", "status": "待处理"}, {"code": "consistency", "label": "一致性核查", "status": "待处理"}, {"code": "risk", "label": "风险预警", "status": "待处理"}, {"code": "word_export", "label": "Word 回填导出", "status": "待处理"}, {"code": "feishu_notify", "label": "飞书通知", "status": "待处理"}, ] def _build_knowledge_node_results() -> list[dict]: return [ {"code": "knowledge_retrieval", "label": "知识库检索", "status": "待处理"}, {"code": "answer_generation", "label": "问答生成", "status": "待处理"}, {"code": "risk", "label": "风险预警", "status": "待处理"}, ] def _persist_notification_records(result: AgentResult, *, web_detail_url: str = "") -> None: payload = result.notification_payload or {} owners = payload.get("owners") or [] if not owners: return resolved_detail_url = payload.get("web_detail_url") or web_detail_url resolved_message_status = payload.get("message_status") or ( "sent" if result.status == "success" else "failed" ) resolved_receipt = payload.get("receipt") or {"status": result.status} for owner in owners: create_notification_record( batch_id=payload.get("batch_id", ""), conversation_id=payload.get("conversation_id", ""), product_name=payload.get("product_name", ""), trigger_source="agent_execution", notify_reason=payload.get("notify_reason", "task_completed"), owner_role=owner.get("owner_role", ""), feishu_user_id=owner.get("feishu_user_id", ""), message_status=resolved_message_status, web_detail_url=resolved_detail_url, receipt=resolved_receipt, ) def _apply_agent_result_to_conversation(conversation: Conversation, result: AgentResult) -> None: conversation.task_status = result.status if result.node_results: conversation.node_results = result.node_results conversation.latest_summary = { "answer": result.answer, "status": result.status, "error": result.error, "structured_output": result.structured_output, "notification_payload": result.notification_payload, } conversation.last_run_at = timezone.now() conversation.save( update_fields=[ "task_status", "node_results", "latest_summary", "last_run_at", "updated_at", ] )