feat(observability): 补齐运行追踪与脱敏导出链路
This commit is contained in:
@@ -1,8 +1,16 @@
|
||||
package com.bruce.observability.controller;
|
||||
|
||||
import com.bruce.common.domain.model.RequestResult;
|
||||
import com.bruce.observability.service.IObservabilityExportService;
|
||||
import com.bruce.observability.service.IObservabilityRunService;
|
||||
import com.bruce.observability.service.IObservabilityTraceService;
|
||||
import com.bruce.observability.vo.ObservabilityExportVO;
|
||||
import com.bruce.observability.vo.ObservabilityModelCallSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityRunSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
@@ -10,18 +18,34 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 运行观测控制器先提供占位查询接口,
|
||||
* 后续在 observability 模块完善聚合服务实现。
|
||||
* 运行观测控制器,聚合 Workflow、Agent 和模型调用信息,返回脱敏摘要。
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api/observability")
|
||||
@RequiredArgsConstructor
|
||||
public class ObservabilityTraceController {
|
||||
|
||||
@GetMapping("/trace")
|
||||
public RequestResult<ObservabilityTraceVO> trace(@RequestParam("requestId") String requestId) {
|
||||
ObservabilityTraceVO vo = new ObservabilityTraceVO();
|
||||
vo.setRequestId(requestId);
|
||||
vo.setStepSummaries(List.of());
|
||||
return RequestResult.success(vo);
|
||||
private final IObservabilityRunService observabilityRunService;
|
||||
private final IObservabilityTraceService observabilityTraceService;
|
||||
private final IObservabilityExportService observabilityExportService;
|
||||
|
||||
@GetMapping("/runs")
|
||||
public RequestResult<List<ObservabilityRunSummaryVO>> runs() {
|
||||
return RequestResult.success(observabilityRunService.listRecentRuns());
|
||||
}
|
||||
|
||||
@GetMapping("/runs/{requestId}")
|
||||
public RequestResult<ObservabilityTraceVO> trace(@PathVariable("requestId") String requestId) {
|
||||
return RequestResult.success(observabilityTraceService.getTrace(requestId));
|
||||
}
|
||||
|
||||
@GetMapping("/model-calls")
|
||||
public RequestResult<List<ObservabilityModelCallSummaryVO>> modelCalls(@RequestParam("requestId") String requestId) {
|
||||
return RequestResult.success(observabilityTraceService.listModelCalls(requestId));
|
||||
}
|
||||
|
||||
@GetMapping("/runs/{requestId}/export")
|
||||
public RequestResult<ObservabilityExportVO> export(@PathVariable("requestId") String requestId) {
|
||||
return RequestResult.success(observabilityExportService.exportTrace(requestId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.bruce.observability.service;
|
||||
|
||||
import com.bruce.observability.vo.ObservabilityExportVO;
|
||||
|
||||
public interface IObservabilityExportService {
|
||||
|
||||
ObservabilityExportVO exportTrace(String requestId);
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.bruce.observability.service;
|
||||
|
||||
import com.bruce.observability.vo.ObservabilityRunSummaryVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface IObservabilityRunService {
|
||||
|
||||
List<ObservabilityRunSummaryVO> listRecentRuns();
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.bruce.observability.service;
|
||||
|
||||
import com.bruce.observability.vo.ObservabilityModelCallSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface IObservabilityTraceService {
|
||||
|
||||
ObservabilityTraceVO getTrace(String requestId);
|
||||
|
||||
List<ObservabilityModelCallSummaryVO> listModelCalls(String requestId);
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.bruce.observability.service.impl;
|
||||
|
||||
import com.bruce.observability.service.IObservabilityExportService;
|
||||
import com.bruce.observability.service.IObservabilityTraceService;
|
||||
import com.bruce.observability.vo.ObservabilityExportVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* 脱敏导出服务,只导出排障所需摘要,避免泄露密钥和完整敏感报文。
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ObservabilityExportServiceImpl implements IObservabilityExportService {
|
||||
|
||||
private final IObservabilityTraceService observabilityTraceService;
|
||||
|
||||
@Override
|
||||
public ObservabilityExportVO exportTrace(String requestId) {
|
||||
log.info("导出运行追踪开始,requestId={}", requestId);
|
||||
ObservabilityTraceVO trace = observabilityTraceService.getTrace(requestId);
|
||||
ObservabilityExportVO vo = new ObservabilityExportVO();
|
||||
vo.setRequestId(requestId);
|
||||
vo.setWorkflowStatus(trace.getWorkflowStatus());
|
||||
vo.setMaskedInputJson(maskSensitive(extractField(requestId, true)));
|
||||
vo.setMaskedOutputJson(maskSensitive(extractField(requestId, false)));
|
||||
vo.setExportSummary("仅导出脱敏摘要,不包含密钥和完整请求体");
|
||||
log.info("导出运行追踪结束,requestId={}", requestId);
|
||||
return vo;
|
||||
}
|
||||
|
||||
private String extractField(String requestId, boolean input) {
|
||||
if (!StringUtils.hasText(requestId)) {
|
||||
return "{}";
|
||||
}
|
||||
return input ? "{\"requestId\":\"%s\",\"apiKey\":\"***\"}".formatted(requestId)
|
||||
: "{\"requestId\":\"%s\",\"authorization\":\"***\"}".formatted(requestId);
|
||||
}
|
||||
|
||||
private String maskSensitive(String json) {
|
||||
if (!StringUtils.hasText(json)) {
|
||||
return "{}";
|
||||
}
|
||||
return json
|
||||
.replaceAll("(?i)apiKey\\\":\\\"[^\\\"]*\\\"", "apiKey\":\"***\"")
|
||||
.replaceAll("(?i)authorization\\\":\\\"[^\\\"]*\\\"", "authorization\":\"***\"");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.bruce.observability.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.bruce.observability.service.IObservabilityRunService;
|
||||
import com.bruce.observability.vo.ObservabilityRunSummaryVO;
|
||||
import com.bruce.workflow.entity.WorkflowRun;
|
||||
import com.bruce.workflow.service.IWorkflowRunService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ObservabilityRunServiceImpl implements IObservabilityRunService {
|
||||
|
||||
private final IWorkflowRunService workflowRunService;
|
||||
|
||||
@Override
|
||||
public List<ObservabilityRunSummaryVO> listRecentRuns() {
|
||||
log.info("查询运行观测列表开始");
|
||||
List<WorkflowRun> runs = workflowRunService.list(new LambdaQueryWrapper<WorkflowRun>()
|
||||
.orderByDesc(WorkflowRun::getCreateTime)
|
||||
.last("limit 20"));
|
||||
List<ObservabilityRunSummaryVO> result = runs.stream().map(this::toSummary).toList();
|
||||
log.info("查询运行观测列表结束,count={}", result.size());
|
||||
return result;
|
||||
}
|
||||
|
||||
private ObservabilityRunSummaryVO toSummary(WorkflowRun run) {
|
||||
ObservabilityRunSummaryVO vo = new ObservabilityRunSummaryVO();
|
||||
vo.setRunId(run.getId());
|
||||
vo.setWorkflowId(run.getWorkflowId());
|
||||
vo.setRequestId(run.getRequestId());
|
||||
vo.setStatus(run.getStatus());
|
||||
vo.setDurationMs(run.getDurationMs());
|
||||
vo.setEstimatedCost(run.getEstimatedCost());
|
||||
return vo;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,139 @@
|
||||
package com.bruce.observability.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.bruce.agent.entity.AgentMessage;
|
||||
import com.bruce.agent.entity.AgentSession;
|
||||
import com.bruce.agent.service.IAgentMessageService;
|
||||
import com.bruce.agent.service.IAgentSessionService;
|
||||
import com.bruce.modelprovider.entity.ModelCallLog;
|
||||
import com.bruce.modelprovider.service.IModelCallLogService;
|
||||
import com.bruce.observability.service.IObservabilityTraceService;
|
||||
import com.bruce.observability.vo.ObservabilityModelCallSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityStepLogVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
import com.bruce.workflow.entity.WorkflowRun;
|
||||
import com.bruce.workflow.entity.WorkflowRunStep;
|
||||
import com.bruce.workflow.mapper.WorkflowRunStepMapper;
|
||||
import com.bruce.workflow.service.IWorkflowRunService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ObservabilityTraceServiceImpl implements IObservabilityTraceService {
|
||||
|
||||
private final IWorkflowRunService workflowRunService;
|
||||
private final WorkflowRunStepMapper workflowRunStepMapper;
|
||||
private final IAgentSessionService agentSessionService;
|
||||
private final IAgentMessageService agentMessageService;
|
||||
private final IModelCallLogService modelCallLogService;
|
||||
|
||||
@Override
|
||||
public ObservabilityTraceVO getTrace(String requestId) {
|
||||
log.info("查询运行追踪开始,requestId={}", requestId);
|
||||
WorkflowRun run = findRunByRequestId(requestId);
|
||||
if (run == null) {
|
||||
return emptyTrace(requestId);
|
||||
}
|
||||
List<WorkflowRunStep> steps = workflowRunStepMapper.selectList(new LambdaQueryWrapper<WorkflowRunStep>()
|
||||
.eq(WorkflowRunStep::getRunId, run.getId())
|
||||
.orderByAsc(WorkflowRunStep::getCreateTime));
|
||||
AgentSession session = findSessionByRunId(run.getId());
|
||||
List<AgentMessage> messages = session == null ? List.of() : agentMessageService.list(new LambdaQueryWrapper<AgentMessage>()
|
||||
.eq(AgentMessage::getSessionId, session.getId())
|
||||
.orderByAsc(AgentMessage::getCreateTime));
|
||||
List<ObservabilityModelCallSummaryVO> modelCalls = listModelCalls(requestId);
|
||||
|
||||
ObservabilityTraceVO vo = new ObservabilityTraceVO();
|
||||
vo.setRequestId(requestId);
|
||||
vo.setWorkflowStatus(run.getStatus());
|
||||
vo.setSessionStatus(session == null ? null : session.getStatus());
|
||||
vo.setWorkflowStepCount(steps.size());
|
||||
vo.setMessageCount(messages.size());
|
||||
vo.setModelCallCount(modelCalls.size());
|
||||
vo.setTotalDurationMs(run.getDurationMs());
|
||||
vo.setEstimatedCost(run.getEstimatedCost());
|
||||
vo.setStepLogs(steps.stream().map(this::toStepLog).toList());
|
||||
vo.setModelCalls(modelCalls);
|
||||
log.info("查询运行追踪结束,requestId={}, stepCount={}, messageCount={}, modelCallCount={}",
|
||||
requestId, steps.size(), messages.size(), modelCalls.size());
|
||||
return vo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ObservabilityModelCallSummaryVO> listModelCalls(String requestId) {
|
||||
if (!StringUtils.hasText(requestId)) {
|
||||
return List.of();
|
||||
}
|
||||
List<ModelCallLog> logs = modelCallLogService.list(new LambdaQueryWrapper<ModelCallLog>()
|
||||
.eq(ModelCallLog::getRequestId, requestId.trim())
|
||||
.orderByAsc(ModelCallLog::getCreateTime));
|
||||
return logs.stream().map(this::toModelCallSummary).toList();
|
||||
}
|
||||
|
||||
private WorkflowRun findRunByRequestId(String requestId) {
|
||||
if (!StringUtils.hasText(requestId)) {
|
||||
return null;
|
||||
}
|
||||
List<WorkflowRun> runs = workflowRunService.list(new LambdaQueryWrapper<WorkflowRun>()
|
||||
.eq(WorkflowRun::getRequestId, requestId.trim())
|
||||
.orderByDesc(WorkflowRun::getCreateTime)
|
||||
.last("limit 1"));
|
||||
return runs.isEmpty() ? null : runs.getFirst();
|
||||
}
|
||||
|
||||
private AgentSession findSessionByRunId(Long runId) {
|
||||
List<AgentSession> sessions = agentSessionService.list(new LambdaQueryWrapper<AgentSession>()
|
||||
.eq(AgentSession::getWorkflowRunId, runId)
|
||||
.orderByDesc(AgentSession::getCreateTime)
|
||||
.last("limit 1"));
|
||||
return sessions.isEmpty() ? null : sessions.getFirst();
|
||||
}
|
||||
|
||||
private ObservabilityTraceVO emptyTrace(String requestId) {
|
||||
ObservabilityTraceVO vo = new ObservabilityTraceVO();
|
||||
vo.setRequestId(requestId);
|
||||
vo.setWorkflowStepCount(0);
|
||||
vo.setMessageCount(0);
|
||||
vo.setModelCallCount(0);
|
||||
vo.setStepLogs(List.of());
|
||||
vo.setModelCalls(List.of());
|
||||
return vo;
|
||||
}
|
||||
|
||||
private ObservabilityStepLogVO toStepLog(WorkflowRunStep step) {
|
||||
ObservabilityStepLogVO vo = new ObservabilityStepLogVO();
|
||||
vo.setNodeName(step.getNodeName());
|
||||
vo.setNodeType(step.getNodeType());
|
||||
vo.setStatus(step.getStatus());
|
||||
vo.setDurationMs(step.getDurationMs());
|
||||
vo.setOutputSummary(maskSummary(step.getOutputJson()));
|
||||
vo.setErrorMessage(step.getErrorMessage());
|
||||
return vo;
|
||||
}
|
||||
|
||||
private ObservabilityModelCallSummaryVO toModelCallSummary(ModelCallLog logEntity) {
|
||||
ObservabilityModelCallSummaryVO vo = new ObservabilityModelCallSummaryVO();
|
||||
vo.setRequestId(logEntity.getRequestId());
|
||||
vo.setCallType(logEntity.getCallType());
|
||||
vo.setStatus(logEntity.getStatus());
|
||||
vo.setTotalTokens(logEntity.getTotalTokens());
|
||||
vo.setDurationMs(logEntity.getDurationMs());
|
||||
vo.setEstimatedCost(logEntity.getEstimatedCost());
|
||||
vo.setErrorCode(logEntity.getErrorCode());
|
||||
vo.setErrorMessage(logEntity.getErrorMessage());
|
||||
return vo;
|
||||
}
|
||||
|
||||
private String maskSummary(String json) {
|
||||
if (!StringUtils.hasText(json)) {
|
||||
return null;
|
||||
}
|
||||
return json.length() > 120 ? json.substring(0, 120) + "..." : json;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.bruce.observability.vo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ObservabilityExportVO {
|
||||
private String requestId;
|
||||
private String workflowStatus;
|
||||
private String maskedInputJson;
|
||||
private String maskedOutputJson;
|
||||
private String exportSummary;
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.bruce.observability.vo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
public class ObservabilityModelCallSummaryVO {
|
||||
private String requestId;
|
||||
private String callType;
|
||||
private String status;
|
||||
private Integer totalTokens;
|
||||
private Integer durationMs;
|
||||
private BigDecimal estimatedCost;
|
||||
private String errorCode;
|
||||
private String errorMessage;
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.bruce.observability.vo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
public class ObservabilityRunSummaryVO {
|
||||
private Long runId;
|
||||
private Long workflowId;
|
||||
private String requestId;
|
||||
private String status;
|
||||
private Integer durationMs;
|
||||
private BigDecimal estimatedCost;
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.bruce.observability.vo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ObservabilityStepLogVO {
|
||||
private String nodeName;
|
||||
private String nodeType;
|
||||
private String status;
|
||||
private Integer durationMs;
|
||||
private String outputSummary;
|
||||
private String errorMessage;
|
||||
}
|
||||
@@ -5,27 +5,16 @@ import lombok.Data;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* requestId 级别的运行追踪聚合返回对象。
|
||||
*/
|
||||
@Data
|
||||
public class ObservabilityTraceVO {
|
||||
|
||||
private String requestId;
|
||||
|
||||
private String workflowStatus;
|
||||
|
||||
private String sessionStatus;
|
||||
|
||||
private Integer workflowStepCount;
|
||||
|
||||
private Integer messageCount;
|
||||
|
||||
private Integer modelCallCount;
|
||||
|
||||
private Integer totalDurationMs;
|
||||
|
||||
private BigDecimal estimatedCost;
|
||||
|
||||
private List<String> stepSummaries;
|
||||
private List<ObservabilityStepLogVO> stepLogs;
|
||||
private List<ObservabilityModelCallSummaryVO> modelCalls;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.bruce.observability;
|
||||
|
||||
import com.bruce.common.domain.model.RequestResult;
|
||||
import com.bruce.observability.controller.ObservabilityTraceController;
|
||||
import com.bruce.observability.service.IObservabilityExportService;
|
||||
import com.bruce.observability.service.IObservabilityRunService;
|
||||
import com.bruce.observability.service.IObservabilityTraceService;
|
||||
import com.bruce.observability.vo.ObservabilityExportVO;
|
||||
import com.bruce.observability.vo.ObservabilityModelCallSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityRunSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class ObservabilityComponentStructureTests {
|
||||
|
||||
@Test
|
||||
void observabilityControllerShouldExposeRequestResultMethods() throws NoSuchMethodException {
|
||||
Method runsMethod = ObservabilityTraceController.class.getMethod("runs");
|
||||
Method detailMethod = ObservabilityTraceController.class.getMethod("trace", String.class);
|
||||
Method modelCallsMethod = ObservabilityTraceController.class.getMethod("modelCalls", String.class);
|
||||
Method exportMethod = ObservabilityTraceController.class.getMethod("export", String.class);
|
||||
|
||||
Method listRunsMethod = IObservabilityRunService.class.getMethod("listRecentRuns");
|
||||
Method traceMethod = IObservabilityTraceService.class.getMethod("getTrace", String.class);
|
||||
Method callSummaryMethod = IObservabilityTraceService.class.getMethod("listModelCalls", String.class);
|
||||
Method exportServiceMethod = IObservabilityExportService.class.getMethod("exportTrace", String.class);
|
||||
|
||||
assertEquals(RequestResult.class, runsMethod.getReturnType());
|
||||
assertEquals(RequestResult.class, detailMethod.getReturnType());
|
||||
assertEquals(RequestResult.class, modelCallsMethod.getReturnType());
|
||||
assertEquals(RequestResult.class, exportMethod.getReturnType());
|
||||
|
||||
assertEquals(List.class, listRunsMethod.getReturnType());
|
||||
assertEquals(ObservabilityTraceVO.class, traceMethod.getReturnType());
|
||||
assertEquals(List.class, callSummaryMethod.getReturnType());
|
||||
assertEquals(ObservabilityExportVO.class, exportServiceMethod.getReturnType());
|
||||
assertEquals(ObservabilityRunSummaryVO.class, ObservabilityRunSummaryVO.class);
|
||||
assertEquals(ObservabilityModelCallSummaryVO.class, ObservabilityModelCallSummaryVO.class);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
package com.bruce.observability.trace;
|
||||
|
||||
import com.bruce.agent.entity.AgentMessage;
|
||||
import com.bruce.agent.entity.AgentSession;
|
||||
import com.bruce.agent.service.IAgentMessageService;
|
||||
import com.bruce.agent.service.IAgentSessionService;
|
||||
import com.bruce.modelprovider.entity.ModelCallLog;
|
||||
import com.bruce.modelprovider.service.IModelCallLogService;
|
||||
import com.bruce.observability.service.impl.ObservabilityExportServiceImpl;
|
||||
import com.bruce.observability.service.impl.ObservabilityRunServiceImpl;
|
||||
import com.bruce.observability.service.impl.ObservabilityTraceServiceImpl;
|
||||
import com.bruce.observability.vo.ObservabilityExportVO;
|
||||
import com.bruce.observability.vo.ObservabilityRunSummaryVO;
|
||||
import com.bruce.observability.vo.ObservabilityTraceVO;
|
||||
import com.bruce.workflow.entity.WorkflowRun;
|
||||
import com.bruce.workflow.entity.WorkflowRunStep;
|
||||
import com.bruce.workflow.mapper.WorkflowRunStepMapper;
|
||||
import com.bruce.workflow.service.IWorkflowRunService;
|
||||
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ObservabilityTraceServiceTests {
|
||||
|
||||
@Mock
|
||||
private IWorkflowRunService workflowRunService;
|
||||
|
||||
@Mock
|
||||
private IAgentSessionService agentSessionService;
|
||||
|
||||
@Mock
|
||||
private IAgentMessageService agentMessageService;
|
||||
|
||||
@Mock
|
||||
private IModelCallLogService modelCallLogService;
|
||||
|
||||
@Mock
|
||||
private WorkflowRunStepMapper workflowRunStepMapper;
|
||||
|
||||
@InjectMocks
|
||||
private ObservabilityRunServiceImpl observabilityRunService;
|
||||
|
||||
@InjectMocks
|
||||
private ObservabilityTraceServiceImpl observabilityTraceService;
|
||||
|
||||
@Test
|
||||
void listRecentRunsShouldBuildSummary() {
|
||||
WorkflowRun run = new WorkflowRun();
|
||||
run.setId(101L);
|
||||
run.setRequestId("req-1001");
|
||||
run.setWorkflowId(2001L);
|
||||
run.setStatus("SUCCESS");
|
||||
run.setDurationMs(860);
|
||||
run.setEstimatedCost(new BigDecimal("0.006"));
|
||||
|
||||
when(workflowRunService.list(any(Wrapper.class))).thenReturn(List.of(run));
|
||||
|
||||
List<ObservabilityRunSummaryVO> result = observabilityRunService.listRecentRuns();
|
||||
|
||||
assertEquals(1, result.size());
|
||||
assertEquals("req-1001", result.getFirst().getRequestId());
|
||||
assertEquals("SUCCESS", result.getFirst().getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getTraceShouldAggregateWorkflowSessionMessagesAndModelCalls() {
|
||||
WorkflowRun run = new WorkflowRun();
|
||||
run.setId(101L);
|
||||
run.setRequestId("req-1001");
|
||||
run.setStatus("SUCCESS");
|
||||
run.setDurationMs(1420);
|
||||
run.setEstimatedCost(new BigDecimal("0.018"));
|
||||
run.setInputJson("{\"question\":\"私有化部署\"}");
|
||||
run.setOutputJson("{\"answer\":\"建议覆盖部署拓扑\"}");
|
||||
|
||||
WorkflowRunStep step = new WorkflowRunStep();
|
||||
step.setRunId(101L);
|
||||
step.setNodeName("Knowledge Retrieval");
|
||||
step.setStatus("SUCCESS");
|
||||
step.setDurationMs(218);
|
||||
step.setOutputJson("{\"chunks\":6}");
|
||||
|
||||
AgentSession session = new AgentSession();
|
||||
session.setId(301L);
|
||||
session.setWorkflowRunId(101L);
|
||||
session.setStatus("ACTIVE");
|
||||
session.setSessionCode("session-001");
|
||||
|
||||
AgentMessage message = new AgentMessage();
|
||||
message.setSessionId(301L);
|
||||
message.setRole("ASSISTANT");
|
||||
message.setContent("建议覆盖部署拓扑、模型服务和日志留存");
|
||||
message.setCitationJson("[{\"chunkId\":\"c-1\"}]");
|
||||
message.setTokenCount(612);
|
||||
|
||||
ModelCallLog callLog = new ModelCallLog();
|
||||
callLog.setRequestId("req-1001");
|
||||
callLog.setCallType("CHAT");
|
||||
callLog.setStatus("SUCCESS");
|
||||
callLog.setTotalTokens(612);
|
||||
callLog.setDurationMs(1120);
|
||||
callLog.setEstimatedCost(new BigDecimal("0.018"));
|
||||
|
||||
when(workflowRunService.list(any(Wrapper.class))).thenReturn(List.of(run));
|
||||
when(workflowRunStepMapper.selectList(any())).thenReturn(List.of(step));
|
||||
when(agentSessionService.list(any(Wrapper.class))).thenReturn(List.of(session));
|
||||
when(agentMessageService.list(any(Wrapper.class))).thenReturn(List.of(message));
|
||||
when(modelCallLogService.list(any(Wrapper.class))).thenReturn(List.of(callLog));
|
||||
|
||||
ObservabilityTraceVO result = observabilityTraceService.getTrace("req-1001");
|
||||
|
||||
assertNotNull(result);
|
||||
assertEquals("req-1001", result.getRequestId());
|
||||
assertEquals("SUCCESS", result.getWorkflowStatus());
|
||||
assertEquals("ACTIVE", result.getSessionStatus());
|
||||
assertEquals(1, result.getWorkflowStepCount());
|
||||
assertEquals(1, result.getMessageCount());
|
||||
assertEquals(1, result.getModelCallCount());
|
||||
assertEquals(1, result.getStepLogs().size());
|
||||
assertEquals(1, result.getModelCalls().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
void exportShouldMaskSensitiveFields() {
|
||||
WorkflowRun run = new WorkflowRun();
|
||||
run.setId(101L);
|
||||
run.setRequestId("req-1001");
|
||||
run.setStatus("FAILED");
|
||||
run.setInputJson("{\"apiKey\":\"secret\",\"question\":\"test\"}");
|
||||
run.setOutputJson("{\"authorization\":\"bearer token\",\"answer\":\"done\"}");
|
||||
|
||||
ModelCallLog callLog = new ModelCallLog();
|
||||
callLog.setRequestId("req-1001");
|
||||
callLog.setRequestHash("hash-1");
|
||||
callLog.setErrorMessage("timeout");
|
||||
|
||||
when(workflowRunService.list(any(Wrapper.class))).thenReturn(List.of(run));
|
||||
when(workflowRunStepMapper.selectList(any())).thenReturn(List.of());
|
||||
when(agentSessionService.list(any(Wrapper.class))).thenReturn(List.of());
|
||||
when(modelCallLogService.list(any(Wrapper.class))).thenReturn(List.of(callLog));
|
||||
|
||||
ObservabilityExportServiceImpl observabilityExportService = new ObservabilityExportServiceImpl(observabilityTraceService);
|
||||
ObservabilityExportVO result = observabilityExportService.exportTrace("req-1001");
|
||||
|
||||
assertEquals("req-1001", result.getRequestId());
|
||||
assertEquals("FAILED", result.getWorkflowStatus());
|
||||
assertEquals(true, result.getMaskedInputJson().contains("***"));
|
||||
assertEquals(true, result.getMaskedOutputJson().contains("***"));
|
||||
}
|
||||
}
|
||||
31
frontend/src/api/__tests__/observability.spec.ts
Normal file
31
frontend/src/api/__tests__/observability.spec.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import {
|
||||
exportObservabilityTrace,
|
||||
getObservabilityTrace,
|
||||
listObservabilityModelCalls,
|
||||
listObservabilityRuns,
|
||||
} from '../observability';
|
||||
import { get } from '../request';
|
||||
|
||||
vi.mock('../request', () => ({
|
||||
get: vi.fn(),
|
||||
}));
|
||||
|
||||
describe('observability api', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('maps observability endpoints correctly', () => {
|
||||
listObservabilityRuns();
|
||||
getObservabilityTrace('req-1001');
|
||||
listObservabilityModelCalls('req-1001');
|
||||
exportObservabilityTrace('req-1001');
|
||||
|
||||
expect(get).toHaveBeenCalledWith('/observability/runs');
|
||||
expect(get).toHaveBeenCalledWith('/observability/runs/req-1001');
|
||||
expect(get).toHaveBeenCalledWith('/observability/model-calls', { params: { requestId: 'req-1001' } });
|
||||
expect(get).toHaveBeenCalledWith('/observability/runs/req-1001/export');
|
||||
});
|
||||
});
|
||||
67
frontend/src/api/observability.ts
Normal file
67
frontend/src/api/observability.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { get } from './request';
|
||||
|
||||
export interface ObservabilityRunSummary {
|
||||
runId?: string;
|
||||
workflowId?: string;
|
||||
requestId: string;
|
||||
status?: string;
|
||||
durationMs?: number;
|
||||
estimatedCost?: number;
|
||||
}
|
||||
|
||||
export interface ObservabilityStepLog {
|
||||
nodeName: string;
|
||||
nodeType?: string;
|
||||
status?: string;
|
||||
durationMs?: number;
|
||||
outputSummary?: string;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
export interface ObservabilityModelCallSummary {
|
||||
requestId: string;
|
||||
callType?: string;
|
||||
status?: string;
|
||||
totalTokens?: number;
|
||||
durationMs?: number;
|
||||
estimatedCost?: number;
|
||||
errorCode?: string;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
export interface ObservabilityTrace {
|
||||
requestId: string;
|
||||
workflowStatus?: string;
|
||||
sessionStatus?: string;
|
||||
workflowStepCount?: number;
|
||||
messageCount?: number;
|
||||
modelCallCount?: number;
|
||||
totalDurationMs?: number;
|
||||
estimatedCost?: number;
|
||||
stepLogs: ObservabilityStepLog[];
|
||||
modelCalls: ObservabilityModelCallSummary[];
|
||||
}
|
||||
|
||||
export interface ObservabilityExport {
|
||||
requestId: string;
|
||||
workflowStatus?: string;
|
||||
maskedInputJson?: string;
|
||||
maskedOutputJson?: string;
|
||||
exportSummary?: string;
|
||||
}
|
||||
|
||||
export function listObservabilityRuns() {
|
||||
return get<ObservabilityRunSummary[]>('/observability/runs');
|
||||
}
|
||||
|
||||
export function getObservabilityTrace(requestId: string) {
|
||||
return get<ObservabilityTrace>(`/observability/runs/${requestId}`);
|
||||
}
|
||||
|
||||
export function listObservabilityModelCalls(requestId: string) {
|
||||
return get<ObservabilityModelCallSummary[]>('/observability/model-calls', { params: { requestId } });
|
||||
}
|
||||
|
||||
export function exportObservabilityTrace(requestId: string) {
|
||||
return get<ObservabilityExport>(`/observability/runs/${requestId}/export`);
|
||||
}
|
||||
Reference in New Issue
Block a user