124 lines
3.8 KiB
Python
124 lines
3.8 KiB
Python
from agent_core.results import AgentResult
|
|
|
|
from .models import AgentAuditLog, NotificationRecord
|
|
|
|
|
|
def create_audit_log(
|
|
scenario_id: str,
|
|
scenario_name: str,
|
|
user_input: str,
|
|
agent_result: AgentResult,
|
|
batch_id: str = "",
|
|
conversation_id: str = "",
|
|
product_name: str = "",
|
|
) -> AgentAuditLog:
|
|
"""
|
|
将一次 Agent 执行结果落库为审计日志。
|
|
|
|
设计原则:
|
|
- 成功与失败都必须记录,方便复盘整条执行链路
|
|
- 敏感信息在写库前先脱敏,避免误存 API Key
|
|
- 对前端和 Django Model 统一输出稳定字段
|
|
"""
|
|
return AgentAuditLog.objects.create(
|
|
scenario_id=scenario_id,
|
|
scenario_name=scenario_name,
|
|
batch_id=batch_id,
|
|
conversation_id=conversation_id,
|
|
product_name=product_name,
|
|
user_input=user_input,
|
|
retrieved_chunks=agent_result.references,
|
|
tool_calls=agent_result.tool_calls,
|
|
structured_output=agent_result.structured_output,
|
|
final_answer=agent_result.answer,
|
|
raw_output=agent_result.raw_output,
|
|
model_name=agent_result.model_name,
|
|
latency_ms=max(agent_result.latency_ms, 0),
|
|
status=agent_result.status,
|
|
error_message=mask_sensitive_text(agent_result.error),
|
|
)
|
|
|
|
|
|
def mask_sensitive_text(value: str) -> str:
|
|
"""
|
|
对错误文本中的敏感配置进行脱敏。
|
|
|
|
当前至少处理:
|
|
- `LLM_API_KEY=...`
|
|
- `EMBEDDING_API_KEY=...`
|
|
"""
|
|
masked = value
|
|
for marker in ("LLM_API_KEY=", "EMBEDDING_API_KEY="):
|
|
masked = _mask_token_after_marker(masked, marker)
|
|
return masked
|
|
|
|
|
|
def _mask_token_after_marker(value: str, marker: str) -> str:
|
|
"""将 marker 后紧跟的 token 替换为脱敏占位符。"""
|
|
if marker not in value:
|
|
return value
|
|
prefix, _, suffix = value.partition(marker)
|
|
secret, separator, rest = suffix.partition(" ")
|
|
masked_secret = "sk-***" if secret.startswith("sk-") else "***"
|
|
return f"{prefix}{marker}{masked_secret}{separator}{rest}"
|
|
|
|
|
|
def create_notification_record(
|
|
*,
|
|
batch_id: str,
|
|
conversation_id: str,
|
|
product_name: str,
|
|
trigger_source: str,
|
|
notify_reason: str,
|
|
owner_role: str,
|
|
feishu_user_id: str,
|
|
message_status: str,
|
|
web_detail_url: str,
|
|
receipt: dict,
|
|
) -> NotificationRecord:
|
|
"""
|
|
保存通知留痕。
|
|
|
|
V1 先把通知载荷和结果状态稳定落库,
|
|
真实飞书发送可在后续阶段接入。
|
|
"""
|
|
return NotificationRecord.objects.create(
|
|
batch_id=batch_id,
|
|
conversation_id=conversation_id,
|
|
product_name=product_name,
|
|
trigger_source=trigger_source,
|
|
notify_reason=notify_reason,
|
|
owner_role=owner_role,
|
|
feishu_user_id=feishu_user_id,
|
|
message_status=message_status,
|
|
web_detail_url=web_detail_url,
|
|
receipt=receipt,
|
|
)
|
|
|
|
|
|
def build_history_rows(logs) -> list[dict]:
|
|
"""
|
|
为处理历史列表补齐风险状态和通知状态。
|
|
|
|
View 只负责收集筛选条件,列表展示所需的聚合字段统一在服务层完成。
|
|
"""
|
|
notification_map = {
|
|
(item.batch_id, item.conversation_id): item
|
|
for item in NotificationRecord.objects.order_by("-created_at")
|
|
}
|
|
rows = []
|
|
for log in logs:
|
|
notification = notification_map.get((log.batch_id, log.conversation_id))
|
|
structured_output = log.structured_output or {}
|
|
rows.append(
|
|
{
|
|
"log": log,
|
|
"risk_status": structured_output.get("highest_risk_level")
|
|
or structured_output.get("risk_level")
|
|
or "-",
|
|
"notify_status": notification.message_status if notification else "-",
|
|
"notify_reason": notification.notify_reason if notification else "-",
|
|
}
|
|
)
|
|
return rows
|