from agent_core.results import AgentResult from apps.chat.models import Conversation from apps.documents.models import SubmissionBatch from .models import AgentAuditLog, NotificationRecord SUPPORTED_NOTIFY_REASONS = {"task_completed", "task_failed"} RISK_STATUS_DISPLAY = { "high": "已阻断", "medium": "待复核", "low": "已完成", "failed": "失败", "processing": "处理中", "pending": "处理中", } EXPORT_STATUS_DISPLAY = { "completed": "已完成", "draft_only": "待复核", "review_required": "待复核", "manual_review": "待复核", "blocked": "已阻断", "failed": "失败", "processing": "处理中", "pending": "处理中", } CONVERSATION_STATUS_DISPLAY = { "success": "已完成", "completed": "已完成", "review_required": "待复核", "blocked": "已阻断", "failed": "失败", "processing": "处理中", "pending": "处理中", } def create_audit_log( scenario_id: str, scenario_name: str, user_input: str, agent_result: AgentResult, batch_id: str = "", conversation_id: str = "", product_name: str = "", ) -> AgentAuditLog: """ 将一次 Agent 执行结果落库为审计日志。 设计原则: - 成功与失败都必须记录,方便复盘整条执行链路 - 敏感信息在写库前先脱敏,避免误存 API Key - 对前端和 Django Model 统一输出稳定字段 """ return AgentAuditLog.objects.create( scenario_id=scenario_id, scenario_name=scenario_name, batch_id=batch_id, conversation_id=conversation_id, product_name=product_name, user_input=user_input, retrieved_chunks=agent_result.references, tool_calls=agent_result.tool_calls, structured_output=agent_result.structured_output, final_answer=agent_result.answer, raw_output=agent_result.raw_output, model_name=agent_result.model_name, latency_ms=max(agent_result.latency_ms, 0), status=agent_result.status, error_message=mask_sensitive_text(agent_result.error), ) def mask_sensitive_text(value: str) -> str: """ 对错误文本中的敏感配置进行脱敏。 当前至少处理: - `LLM_API_KEY=...` - `EMBEDDING_API_KEY=...` """ masked = value for marker in ("LLM_API_KEY=", "EMBEDDING_API_KEY="): masked = _mask_token_after_marker(masked, marker) return masked def _mask_token_after_marker(value: str, marker: str) -> str: """将 marker 后紧跟的 token 替换为脱敏占位符。""" if marker not in value: return value prefix, _, suffix = value.partition(marker) secret, separator, rest = suffix.partition(" ") masked_secret = "sk-***" if secret.startswith("sk-") else "***" return f"{prefix}{marker}{masked_secret}{separator}{rest}" def create_notification_record( *, batch_id: str, conversation_id: str, product_name: str, trigger_source: str, notify_reason: str, owner_role: str, feishu_user_id: str, message_status: str, web_detail_url: str, receipt: dict, ) -> NotificationRecord: """ 保存通知留痕。 V1 先把通知载荷和结果状态稳定落库, 真实飞书发送可在后续阶段接入。 """ if notify_reason not in SUPPORTED_NOTIFY_REASONS: raise ValueError(f"notify_reason 不受支持:{notify_reason}") return NotificationRecord.objects.create( batch_id=batch_id, conversation_id=conversation_id, product_name=product_name, trigger_source=trigger_source, notify_reason=notify_reason, owner_role=owner_role, feishu_user_id=feishu_user_id, message_status=message_status, web_detail_url=web_detail_url, receipt=receipt, ) def build_history_list_context( *, scenario_id: str = "", keyword: str = "", notify_status: str = "", risk_status: str = "", ) -> dict: """ 组装处理历史列表页所需的筛选结果与展示上下文。 View 只负责读取 query params,筛选逻辑和列表聚合统一在服务层完成。 """ logs = AgentAuditLog.objects.all() if scenario_id: logs = logs.filter(scenario_id=scenario_id) if keyword: logs = logs.filter(product_name__icontains=keyword) | logs.filter(batch_id__icontains=keyword) if notify_status: matched_pairs = list( NotificationRecord.objects.filter(message_status=notify_status).values_list( "batch_id", "conversation_id", ) ) logs = [ log for log in logs if (log.batch_id, log.conversation_id) in matched_pairs ] if risk_status: logs = [ log for log in logs if (log.structured_output or {}).get("highest_risk_level") == risk_status or (log.structured_output or {}).get("risk_level") == risk_status ] history_rows = build_history_rows(logs) return { "history_rows": history_rows, "history_metrics": build_history_metrics(history_rows), "selected_scenario_id": scenario_id, "keyword": keyword, "notify_status": notify_status, "risk_status": risk_status, } def build_history_rows(logs) -> list[dict]: """ 为处理历史列表补齐风险状态和通知状态。 View 只负责收集筛选条件,列表展示所需的聚合字段统一在服务层完成。 """ notification_map = { (item.batch_id, item.conversation_id): item for item in NotificationRecord.objects.order_by("-created_at") } batch_map = { item.batch_id: item for item in SubmissionBatch.objects.filter( batch_id__in=[log.batch_id for log in logs if log.batch_id] ) } conversation_map = { item.conversation_id: item for item in Conversation.objects.filter( conversation_id__in=[log.conversation_id for log in logs if log.conversation_id] ) } rows = [] for log in logs: notification = notification_map.get((log.batch_id, log.conversation_id)) batch = batch_map.get(log.batch_id) conversation = conversation_map.get(log.conversation_id) structured_output = log.structured_output or {} rows.append( { "log": log, "batch": batch, "conversation": conversation, "batch_scale": f"{batch.file_count} 份 / {batch.page_count} 页" if batch else "-", "batch_status": batch.get_import_status_display_text() if batch else "-", "conversation_status": _get_conversation_status_display_text( conversation.task_status if conversation else "-" ), "risk_status": _get_risk_status_display_text( structured_output.get("highest_risk_level") or structured_output.get("risk_level") or "-" ), "notify_status": notification.get_message_status_display_text() if notification else "-", "notify_reason": notification.notify_reason if notification else "-", } ) return rows def build_history_metrics(history_rows: list[dict]) -> list[dict]: """ 为处理历史页生成顶部指标卡。 口径保持前台可讲解: - 处理任务数:当前筛选结果中的执行记录数 - 成功执行:状态为 success 的记录数 - 通知已发送:通知状态为 sent 的记录数 - 高风险阻断:风险等级为 high 的记录数 """ total_count = len(history_rows) success_count = sum(1 for row in history_rows if row["log"].status == "success") notify_sent_count = sum(1 for row in history_rows if row.get("notify_status") == "已发送") blocked_count = sum(1 for row in history_rows if row.get("risk_status") == "已阻断") return [ {"label": "处理任务数", "value": total_count, "note": "按当前筛选条件回看执行留痕。"}, {"label": "成功执行", "value": success_count, "note": "执行完成并写入审计快照。"}, {"label": "通知已发送", "value": notify_sent_count, "note": "已生成已发送状态的通知留痕。"}, {"label": "高风险阻断", "value": blocked_count, "note": "当前风险状态为已阻断的处理记录。"}, ] def build_detail_summary(log: AgentAuditLog, conversation, notifications) -> dict: """ 组装处理历史详情页的导出摘要与通知回执信息。 详情页模板只负责展示,字段拼装与优先级判断统一放在服务层。 """ structured_output = log.structured_output or {} output_file = structured_output.get("output_file") or {} export_node = None if conversation and conversation.node_results: export_node = next( (node for node in conversation.node_results if node.get("label") == "Word 回填导出"), None, ) latest_notification = notifications.first() if hasattr(notifications, "first") else None return { "export_status": _get_export_status_display_text( structured_output.get("export_status") or (export_node or {}).get("status", "-") ), "download_url": structured_output.get("download_url", ""), "output_file_name": output_file.get("file_name", ""), "output_file_relative_path": output_file.get("relative_path", ""), "export_mode": output_file.get("export_mode", ""), "template_name": structured_output.get("template_name", ""), "template_version": structured_output.get("template_version", ""), "draft_export_status": _get_export_status_display_text( structured_output.get("draft_export_status", "") ), "formal_export_status": _get_export_status_display_text( structured_output.get("formal_export_status", "") ), "blocked_items": structured_output.get("blocked_items") or [], "notification_receipt": latest_notification.receipt if latest_notification else {}, } def normalize_conversation_node_results(node_results: list[dict] | None) -> list[dict]: """ 统一处理历史详情页的节点状态展示口径。 兼容历史上遗留的“飞书通知 / 已完成”节点状态, 页面展示时统一映射为“已发送”。 """ normalized = [] for node in node_results or []: item = dict(node) if item.get("label") == "飞书通知": status = item.get("status", "") if status in {"已完成", "success", "sent"}: item["status"] = "已发送" elif status in {"failed", "error"}: item["status"] = "失败" elif status in {"pending", "processing"}: item["status"] = "待处理" normalized.append(item) return normalized def _get_risk_status_display_text(status: str) -> str: return RISK_STATUS_DISPLAY.get(status, status or "-") def _get_export_status_display_text(status: str) -> str: return EXPORT_STATUS_DISPLAY.get(status, status or "-") def _get_conversation_status_display_text(status: str) -> str: return CONVERSATION_STATUS_DISPLAY.get(status, status or "-")