323 lines
11 KiB
Python
323 lines
11 KiB
Python
from agent_core.results import AgentResult
|
||
from apps.chat.models import Conversation
|
||
from apps.documents.models import SubmissionBatch
|
||
|
||
from .models import AgentAuditLog, NotificationRecord
|
||
|
||
|
||
SUPPORTED_NOTIFY_REASONS = {"task_completed", "task_failed"}
|
||
|
||
RISK_STATUS_DISPLAY = {
|
||
"high": "已阻断",
|
||
"medium": "待复核",
|
||
"low": "已完成",
|
||
"failed": "失败",
|
||
"processing": "处理中",
|
||
"pending": "处理中",
|
||
}
|
||
|
||
EXPORT_STATUS_DISPLAY = {
|
||
"completed": "已完成",
|
||
"draft_only": "待复核",
|
||
"review_required": "待复核",
|
||
"manual_review": "待复核",
|
||
"blocked": "已阻断",
|
||
"failed": "失败",
|
||
"processing": "处理中",
|
||
"pending": "处理中",
|
||
}
|
||
|
||
CONVERSATION_STATUS_DISPLAY = {
|
||
"success": "已完成",
|
||
"completed": "已完成",
|
||
"review_required": "待复核",
|
||
"blocked": "已阻断",
|
||
"failed": "失败",
|
||
"processing": "处理中",
|
||
"pending": "处理中",
|
||
}
|
||
|
||
|
||
def create_audit_log(
|
||
scenario_id: str,
|
||
scenario_name: str,
|
||
user_input: str,
|
||
agent_result: AgentResult,
|
||
batch_id: str = "",
|
||
conversation_id: str = "",
|
||
product_name: str = "",
|
||
) -> AgentAuditLog:
|
||
"""
|
||
将一次 Agent 执行结果落库为审计日志。
|
||
|
||
设计原则:
|
||
- 成功与失败都必须记录,方便复盘整条执行链路
|
||
- 敏感信息在写库前先脱敏,避免误存 API Key
|
||
- 对前端和 Django Model 统一输出稳定字段
|
||
"""
|
||
return AgentAuditLog.objects.create(
|
||
scenario_id=scenario_id,
|
||
scenario_name=scenario_name,
|
||
batch_id=batch_id,
|
||
conversation_id=conversation_id,
|
||
product_name=product_name,
|
||
user_input=user_input,
|
||
retrieved_chunks=agent_result.references,
|
||
tool_calls=agent_result.tool_calls,
|
||
structured_output=agent_result.structured_output,
|
||
final_answer=agent_result.answer,
|
||
raw_output=agent_result.raw_output,
|
||
model_name=agent_result.model_name,
|
||
latency_ms=max(agent_result.latency_ms, 0),
|
||
status=agent_result.status,
|
||
error_message=mask_sensitive_text(agent_result.error),
|
||
)
|
||
|
||
|
||
def mask_sensitive_text(value: str) -> str:
|
||
"""
|
||
对错误文本中的敏感配置进行脱敏。
|
||
|
||
当前至少处理:
|
||
- `LLM_API_KEY=...`
|
||
- `EMBEDDING_API_KEY=...`
|
||
"""
|
||
masked = value
|
||
for marker in ("LLM_API_KEY=", "EMBEDDING_API_KEY="):
|
||
masked = _mask_token_after_marker(masked, marker)
|
||
return masked
|
||
|
||
|
||
def _mask_token_after_marker(value: str, marker: str) -> str:
|
||
"""将 marker 后紧跟的 token 替换为脱敏占位符。"""
|
||
if marker not in value:
|
||
return value
|
||
prefix, _, suffix = value.partition(marker)
|
||
secret, separator, rest = suffix.partition(" ")
|
||
masked_secret = "sk-***" if secret.startswith("sk-") else "***"
|
||
return f"{prefix}{marker}{masked_secret}{separator}{rest}"
|
||
|
||
|
||
def create_notification_record(
|
||
*,
|
||
batch_id: str,
|
||
conversation_id: str,
|
||
product_name: str,
|
||
trigger_source: str,
|
||
notify_reason: str,
|
||
owner_role: str,
|
||
feishu_user_id: str,
|
||
message_status: str,
|
||
web_detail_url: str,
|
||
receipt: dict,
|
||
) -> NotificationRecord:
|
||
"""
|
||
保存通知留痕。
|
||
|
||
V1 先把通知载荷和结果状态稳定落库,
|
||
真实飞书发送可在后续阶段接入。
|
||
"""
|
||
if notify_reason not in SUPPORTED_NOTIFY_REASONS:
|
||
raise ValueError(f"notify_reason 不受支持:{notify_reason}")
|
||
return NotificationRecord.objects.create(
|
||
batch_id=batch_id,
|
||
conversation_id=conversation_id,
|
||
product_name=product_name,
|
||
trigger_source=trigger_source,
|
||
notify_reason=notify_reason,
|
||
owner_role=owner_role,
|
||
feishu_user_id=feishu_user_id,
|
||
message_status=message_status,
|
||
web_detail_url=web_detail_url,
|
||
receipt=receipt,
|
||
)
|
||
|
||
|
||
def build_history_list_context(
|
||
*,
|
||
scenario_id: str = "",
|
||
keyword: str = "",
|
||
notify_status: str = "",
|
||
risk_status: str = "",
|
||
) -> dict:
|
||
"""
|
||
组装处理历史列表页所需的筛选结果与展示上下文。
|
||
|
||
View 只负责读取 query params,筛选逻辑和列表聚合统一在服务层完成。
|
||
"""
|
||
logs = AgentAuditLog.objects.all()
|
||
if scenario_id:
|
||
logs = logs.filter(scenario_id=scenario_id)
|
||
if keyword:
|
||
logs = logs.filter(product_name__icontains=keyword) | logs.filter(batch_id__icontains=keyword)
|
||
if notify_status:
|
||
matched_pairs = list(
|
||
NotificationRecord.objects.filter(message_status=notify_status).values_list(
|
||
"batch_id",
|
||
"conversation_id",
|
||
)
|
||
)
|
||
logs = [
|
||
log
|
||
for log in logs
|
||
if (log.batch_id, log.conversation_id) in matched_pairs
|
||
]
|
||
if risk_status:
|
||
logs = [
|
||
log
|
||
for log in logs
|
||
if (log.structured_output or {}).get("highest_risk_level") == risk_status
|
||
or (log.structured_output or {}).get("risk_level") == risk_status
|
||
]
|
||
history_rows = build_history_rows(logs)
|
||
return {
|
||
"history_rows": history_rows,
|
||
"history_metrics": build_history_metrics(history_rows),
|
||
"selected_scenario_id": scenario_id,
|
||
"keyword": keyword,
|
||
"notify_status": notify_status,
|
||
"risk_status": risk_status,
|
||
}
|
||
|
||
|
||
def build_history_rows(logs) -> list[dict]:
|
||
"""
|
||
为处理历史列表补齐风险状态和通知状态。
|
||
|
||
View 只负责收集筛选条件,列表展示所需的聚合字段统一在服务层完成。
|
||
"""
|
||
notification_map = {
|
||
(item.batch_id, item.conversation_id): item
|
||
for item in NotificationRecord.objects.order_by("-created_at")
|
||
}
|
||
batch_map = {
|
||
item.batch_id: item
|
||
for item in SubmissionBatch.objects.filter(
|
||
batch_id__in=[log.batch_id for log in logs if log.batch_id]
|
||
)
|
||
}
|
||
conversation_map = {
|
||
item.conversation_id: item
|
||
for item in Conversation.objects.filter(
|
||
conversation_id__in=[log.conversation_id for log in logs if log.conversation_id]
|
||
)
|
||
}
|
||
rows = []
|
||
for log in logs:
|
||
notification = notification_map.get((log.batch_id, log.conversation_id))
|
||
batch = batch_map.get(log.batch_id)
|
||
conversation = conversation_map.get(log.conversation_id)
|
||
structured_output = log.structured_output or {}
|
||
rows.append(
|
||
{
|
||
"log": log,
|
||
"batch": batch,
|
||
"conversation": conversation,
|
||
"batch_scale": f"{batch.file_count} 份 / {batch.page_count} 页" if batch else "-",
|
||
"batch_status": batch.get_import_status_display_text() if batch else "-",
|
||
"conversation_status": _get_conversation_status_display_text(
|
||
conversation.task_status if conversation else "-"
|
||
),
|
||
"risk_status": _get_risk_status_display_text(
|
||
structured_output.get("highest_risk_level")
|
||
or structured_output.get("risk_level")
|
||
or "-"
|
||
),
|
||
"notify_status": notification.get_message_status_display_text() if notification else "-",
|
||
"notify_reason": notification.notify_reason if notification else "-",
|
||
}
|
||
)
|
||
return rows
|
||
|
||
|
||
def build_history_metrics(history_rows: list[dict]) -> list[dict]:
|
||
"""
|
||
为处理历史页生成顶部指标卡。
|
||
|
||
口径保持前台可讲解:
|
||
- 处理任务数:当前筛选结果中的执行记录数
|
||
- 成功执行:状态为 success 的记录数
|
||
- 通知已发送:通知状态为 sent 的记录数
|
||
- 高风险阻断:风险等级为 high 的记录数
|
||
"""
|
||
total_count = len(history_rows)
|
||
success_count = sum(1 for row in history_rows if row["log"].status == "success")
|
||
notify_sent_count = sum(1 for row in history_rows if row.get("notify_status") == "已发送")
|
||
blocked_count = sum(1 for row in history_rows if row.get("risk_status") == "已阻断")
|
||
return [
|
||
{"label": "处理任务数", "value": total_count, "note": "按当前筛选条件回看执行留痕。"},
|
||
{"label": "成功执行", "value": success_count, "note": "执行完成并写入审计快照。"},
|
||
{"label": "通知已发送", "value": notify_sent_count, "note": "已生成已发送状态的通知留痕。"},
|
||
{"label": "高风险阻断", "value": blocked_count, "note": "当前风险状态为已阻断的处理记录。"},
|
||
]
|
||
|
||
|
||
def build_detail_summary(log: AgentAuditLog, conversation, notifications) -> dict:
|
||
"""
|
||
组装处理历史详情页的导出摘要与通知回执信息。
|
||
|
||
详情页模板只负责展示,字段拼装与优先级判断统一放在服务层。
|
||
"""
|
||
structured_output = log.structured_output or {}
|
||
output_file = structured_output.get("output_file") or {}
|
||
export_node = None
|
||
if conversation and conversation.node_results:
|
||
export_node = next(
|
||
(node for node in conversation.node_results if node.get("label") == "Word 回填导出"),
|
||
None,
|
||
)
|
||
latest_notification = notifications.first() if hasattr(notifications, "first") else None
|
||
return {
|
||
"export_status": _get_export_status_display_text(
|
||
structured_output.get("export_status") or (export_node or {}).get("status", "-")
|
||
),
|
||
"download_url": structured_output.get("download_url", ""),
|
||
"output_file_name": output_file.get("file_name", ""),
|
||
"output_file_relative_path": output_file.get("relative_path", ""),
|
||
"export_mode": output_file.get("export_mode", ""),
|
||
"template_name": structured_output.get("template_name", ""),
|
||
"template_version": structured_output.get("template_version", ""),
|
||
"draft_export_status": _get_export_status_display_text(
|
||
structured_output.get("draft_export_status", "")
|
||
),
|
||
"formal_export_status": _get_export_status_display_text(
|
||
structured_output.get("formal_export_status", "")
|
||
),
|
||
"blocked_items": structured_output.get("blocked_items") or [],
|
||
"notification_receipt": latest_notification.receipt if latest_notification else {},
|
||
}
|
||
|
||
|
||
def normalize_conversation_node_results(node_results: list[dict] | None) -> list[dict]:
|
||
"""
|
||
统一处理历史详情页的节点状态展示口径。
|
||
|
||
兼容历史上遗留的“飞书通知 / 已完成”节点状态,
|
||
页面展示时统一映射为“已发送”。
|
||
"""
|
||
normalized = []
|
||
for node in node_results or []:
|
||
item = dict(node)
|
||
if item.get("label") == "飞书通知":
|
||
status = item.get("status", "")
|
||
if status in {"已完成", "success", "sent"}:
|
||
item["status"] = "已发送"
|
||
elif status in {"failed", "error"}:
|
||
item["status"] = "失败"
|
||
elif status in {"pending", "processing"}:
|
||
item["status"] = "待处理"
|
||
normalized.append(item)
|
||
return normalized
|
||
|
||
|
||
def _get_risk_status_display_text(status: str) -> str:
|
||
return RISK_STATUS_DISPLAY.get(status, status or "-")
|
||
|
||
|
||
def _get_export_status_display_text(status: str) -> str:
|
||
return EXPORT_STATUS_DISPLAY.get(status, status or "-")
|
||
|
||
|
||
def _get_conversation_status_display_text(status: str) -> str:
|
||
return CONVERSATION_STATUS_DISPLAY.get(status, status or "-")
|