Files
DEMO-AGENT/apps/audit/services.py

323 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from agent_core.results import AgentResult
from apps.chat.models import Conversation
from apps.documents.models import SubmissionBatch
from .models import AgentAuditLog, NotificationRecord
SUPPORTED_NOTIFY_REASONS = {"task_completed", "task_failed"}
RISK_STATUS_DISPLAY = {
"high": "已阻断",
"medium": "待复核",
"low": "已完成",
"failed": "失败",
"processing": "处理中",
"pending": "处理中",
}
EXPORT_STATUS_DISPLAY = {
"completed": "已完成",
"draft_only": "待复核",
"review_required": "待复核",
"manual_review": "待复核",
"blocked": "已阻断",
"failed": "失败",
"processing": "处理中",
"pending": "处理中",
}
CONVERSATION_STATUS_DISPLAY = {
"success": "已完成",
"completed": "已完成",
"review_required": "待复核",
"blocked": "已阻断",
"failed": "失败",
"processing": "处理中",
"pending": "处理中",
}
def create_audit_log(
scenario_id: str,
scenario_name: str,
user_input: str,
agent_result: AgentResult,
batch_id: str = "",
conversation_id: str = "",
product_name: str = "",
) -> AgentAuditLog:
"""
将一次 Agent 执行结果落库为审计日志。
设计原则:
- 成功与失败都必须记录,方便复盘整条执行链路
- 敏感信息在写库前先脱敏,避免误存 API Key
- 对前端和 Django Model 统一输出稳定字段
"""
return AgentAuditLog.objects.create(
scenario_id=scenario_id,
scenario_name=scenario_name,
batch_id=batch_id,
conversation_id=conversation_id,
product_name=product_name,
user_input=user_input,
retrieved_chunks=agent_result.references,
tool_calls=agent_result.tool_calls,
structured_output=agent_result.structured_output,
final_answer=agent_result.answer,
raw_output=agent_result.raw_output,
model_name=agent_result.model_name,
latency_ms=max(agent_result.latency_ms, 0),
status=agent_result.status,
error_message=mask_sensitive_text(agent_result.error),
)
def mask_sensitive_text(value: str) -> str:
"""
对错误文本中的敏感配置进行脱敏。
当前至少处理:
- `LLM_API_KEY=...`
- `EMBEDDING_API_KEY=...`
"""
masked = value
for marker in ("LLM_API_KEY=", "EMBEDDING_API_KEY="):
masked = _mask_token_after_marker(masked, marker)
return masked
def _mask_token_after_marker(value: str, marker: str) -> str:
"""将 marker 后紧跟的 token 替换为脱敏占位符。"""
if marker not in value:
return value
prefix, _, suffix = value.partition(marker)
secret, separator, rest = suffix.partition(" ")
masked_secret = "sk-***" if secret.startswith("sk-") else "***"
return f"{prefix}{marker}{masked_secret}{separator}{rest}"
def create_notification_record(
*,
batch_id: str,
conversation_id: str,
product_name: str,
trigger_source: str,
notify_reason: str,
owner_role: str,
feishu_user_id: str,
message_status: str,
web_detail_url: str,
receipt: dict,
) -> NotificationRecord:
"""
保存通知留痕。
V1 先把通知载荷和结果状态稳定落库,
真实飞书发送可在后续阶段接入。
"""
if notify_reason not in SUPPORTED_NOTIFY_REASONS:
raise ValueError(f"notify_reason 不受支持:{notify_reason}")
return NotificationRecord.objects.create(
batch_id=batch_id,
conversation_id=conversation_id,
product_name=product_name,
trigger_source=trigger_source,
notify_reason=notify_reason,
owner_role=owner_role,
feishu_user_id=feishu_user_id,
message_status=message_status,
web_detail_url=web_detail_url,
receipt=receipt,
)
def build_history_list_context(
*,
scenario_id: str = "",
keyword: str = "",
notify_status: str = "",
risk_status: str = "",
) -> dict:
"""
组装处理历史列表页所需的筛选结果与展示上下文。
View 只负责读取 query params筛选逻辑和列表聚合统一在服务层完成。
"""
logs = AgentAuditLog.objects.all()
if scenario_id:
logs = logs.filter(scenario_id=scenario_id)
if keyword:
logs = logs.filter(product_name__icontains=keyword) | logs.filter(batch_id__icontains=keyword)
if notify_status:
matched_pairs = list(
NotificationRecord.objects.filter(message_status=notify_status).values_list(
"batch_id",
"conversation_id",
)
)
logs = [
log
for log in logs
if (log.batch_id, log.conversation_id) in matched_pairs
]
if risk_status:
logs = [
log
for log in logs
if (log.structured_output or {}).get("highest_risk_level") == risk_status
or (log.structured_output or {}).get("risk_level") == risk_status
]
history_rows = build_history_rows(logs)
return {
"history_rows": history_rows,
"history_metrics": build_history_metrics(history_rows),
"selected_scenario_id": scenario_id,
"keyword": keyword,
"notify_status": notify_status,
"risk_status": risk_status,
}
def build_history_rows(logs) -> list[dict]:
"""
为处理历史列表补齐风险状态和通知状态。
View 只负责收集筛选条件,列表展示所需的聚合字段统一在服务层完成。
"""
notification_map = {
(item.batch_id, item.conversation_id): item
for item in NotificationRecord.objects.order_by("-created_at")
}
batch_map = {
item.batch_id: item
for item in SubmissionBatch.objects.filter(
batch_id__in=[log.batch_id for log in logs if log.batch_id]
)
}
conversation_map = {
item.conversation_id: item
for item in Conversation.objects.filter(
conversation_id__in=[log.conversation_id for log in logs if log.conversation_id]
)
}
rows = []
for log in logs:
notification = notification_map.get((log.batch_id, log.conversation_id))
batch = batch_map.get(log.batch_id)
conversation = conversation_map.get(log.conversation_id)
structured_output = log.structured_output or {}
rows.append(
{
"log": log,
"batch": batch,
"conversation": conversation,
"batch_scale": f"{batch.file_count} 份 / {batch.page_count}" if batch else "-",
"batch_status": batch.get_import_status_display_text() if batch else "-",
"conversation_status": _get_conversation_status_display_text(
conversation.task_status if conversation else "-"
),
"risk_status": _get_risk_status_display_text(
structured_output.get("highest_risk_level")
or structured_output.get("risk_level")
or "-"
),
"notify_status": notification.get_message_status_display_text() if notification else "-",
"notify_reason": notification.notify_reason if notification else "-",
}
)
return rows
def build_history_metrics(history_rows: list[dict]) -> list[dict]:
"""
为处理历史页生成顶部指标卡。
口径保持前台可讲解:
- 处理任务数:当前筛选结果中的执行记录数
- 成功执行:状态为 success 的记录数
- 通知已发送:通知状态为 sent 的记录数
- 高风险阻断:风险等级为 high 的记录数
"""
total_count = len(history_rows)
success_count = sum(1 for row in history_rows if row["log"].status == "success")
notify_sent_count = sum(1 for row in history_rows if row.get("notify_status") == "已发送")
blocked_count = sum(1 for row in history_rows if row.get("risk_status") == "已阻断")
return [
{"label": "处理任务数", "value": total_count, "note": "按当前筛选条件回看执行留痕。"},
{"label": "成功执行", "value": success_count, "note": "执行完成并写入审计快照。"},
{"label": "通知已发送", "value": notify_sent_count, "note": "已生成已发送状态的通知留痕。"},
{"label": "高风险阻断", "value": blocked_count, "note": "当前风险状态为已阻断的处理记录。"},
]
def build_detail_summary(log: AgentAuditLog, conversation, notifications) -> dict:
"""
组装处理历史详情页的导出摘要与通知回执信息。
详情页模板只负责展示,字段拼装与优先级判断统一放在服务层。
"""
structured_output = log.structured_output or {}
output_file = structured_output.get("output_file") or {}
export_node = None
if conversation and conversation.node_results:
export_node = next(
(node for node in conversation.node_results if node.get("label") == "Word 回填导出"),
None,
)
latest_notification = notifications.first() if hasattr(notifications, "first") else None
return {
"export_status": _get_export_status_display_text(
structured_output.get("export_status") or (export_node or {}).get("status", "-")
),
"download_url": structured_output.get("download_url", ""),
"output_file_name": output_file.get("file_name", ""),
"output_file_relative_path": output_file.get("relative_path", ""),
"export_mode": output_file.get("export_mode", ""),
"template_name": structured_output.get("template_name", ""),
"template_version": structured_output.get("template_version", ""),
"draft_export_status": _get_export_status_display_text(
structured_output.get("draft_export_status", "")
),
"formal_export_status": _get_export_status_display_text(
structured_output.get("formal_export_status", "")
),
"blocked_items": structured_output.get("blocked_items") or [],
"notification_receipt": latest_notification.receipt if latest_notification else {},
}
def normalize_conversation_node_results(node_results: list[dict] | None) -> list[dict]:
"""
统一处理历史详情页的节点状态展示口径。
兼容历史上遗留的“飞书通知 / 已完成”节点状态,
页面展示时统一映射为“已发送”。
"""
normalized = []
for node in node_results or []:
item = dict(node)
if item.get("label") == "飞书通知":
status = item.get("status", "")
if status in {"已完成", "success", "sent"}:
item["status"] = "已发送"
elif status in {"failed", "error"}:
item["status"] = "失败"
elif status in {"pending", "processing"}:
item["status"] = "待处理"
normalized.append(item)
return normalized
def _get_risk_status_display_text(status: str) -> str:
return RISK_STATUS_DISPLAY.get(status, status or "-")
def _get_export_status_display_text(status: str) -> str:
return EXPORT_STATUS_DISPLAY.get(status, status or "-")
def _get_conversation_status_display_text(status: str) -> str:
return CONVERSATION_STATUS_DISPLAY.get(status, status or "-")