feat(studio): 补齐剩余工作台聚合接口与真实对接

This commit is contained in:
2026-06-01 05:28:11 +08:00
parent 8f7ffd6cc9
commit ebe0fc5a12
35 changed files with 2092 additions and 123 deletions

View File

@@ -0,0 +1,40 @@
package com.bruce.rag.controller;
import com.bruce.common.domain.model.RequestResult;
import com.bruce.rag.service.IIngestionRunService;
import com.bruce.rag.vo.IngestionRunVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 文件解析管道聚合接口。
*/
@Tag(name = "文件解析管道")
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/knowledge/ingestion-runs")
public class IngestionRunController {
private final IIngestionRunService ingestionRunService;
@Operation(summary = "查询文件解析管道聚合视图")
@GetMapping("/{runId}")
public RequestResult<IngestionRunVO> detail(@PathVariable("runId") String runId,
@RequestParam("storeId") Long storeId,
@RequestParam("documentId") Long documentId) {
log.info("文件解析管道查询开始runId={}, storeId={}, documentId={}", runId, storeId, documentId);
IngestionRunVO view = ingestionRunService.getRun(storeId, documentId);
view.setRunId(runId);
log.info("文件解析管道查询结束runId={}, storeId={}, documentId={}, stepCount={}, logCount={}",
runId, storeId, documentId, view.getSteps().size(), view.getLogs().size());
return RequestResult.success(view);
}
}

View File

@@ -0,0 +1,14 @@
package com.bruce.rag.service;
import com.bruce.rag.vo.IngestionRunVO;
/**
* 文件解析管道聚合服务。
*/
public interface IIngestionRunService {
/**
* 按知识库和文档聚合摄取流水线视图。
*/
IngestionRunVO getRun(Long storeId, Long documentId);
}

View File

@@ -0,0 +1,281 @@
package com.bruce.rag.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.bruce.modelprovider.dto.response.RagStoreModelConfigResponse;
import com.bruce.modelprovider.service.IRagStoreModelConfigService;
import com.bruce.rag.dto.response.RagDocumentResponse;
import com.bruce.rag.dto.response.RagStoreResponse;
import com.bruce.rag.entity.RagChunk;
import com.bruce.rag.entity.RagChunkEmbedding;
import com.bruce.rag.entity.RagDocumentParseResult;
import com.bruce.rag.service.IIngestionRunService;
import com.bruce.rag.service.IRagChunkEmbeddingService;
import com.bruce.rag.service.IRagChunkService;
import com.bruce.rag.service.IRagDocumentParseResultService;
import com.bruce.rag.service.IRagDocumentService;
import com.bruce.rag.service.IRagStoreService;
import com.bruce.rag.vo.IngestionRunFileVO;
import com.bruce.rag.vo.IngestionRunLogVO;
import com.bruce.rag.vo.IngestionRunStepVO;
import com.bruce.rag.vo.IngestionRunVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
/**
* 文件解析管道聚合实现。
* <p>
* 首轮实现保持“主数据优先”,直接复用文档、解析快照、切片和向量结果生成前端可消费的聚合视图。
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class IngestionRunServiceImpl implements IIngestionRunService {
private static final String PARSE_STATUS_PARSED = "PARSED";
private static final String PARSE_STATUS_FAILED = "FAILED";
private static final String INDEX_STATUS_INDEXED = "INDEXED";
private static final String STEP_DONE = "done";
private static final String STEP_RUNNING = "running";
private static final String STEP_BLOCKED = "blocked";
private static final String STEP_IDLE = "idle";
private final IRagStoreService ragStoreService;
private final IRagDocumentService ragDocumentService;
private final IRagDocumentParseResultService ragDocumentParseResultService;
private final IRagChunkService ragChunkService;
private final IRagChunkEmbeddingService ragChunkEmbeddingService;
private final IRagStoreModelConfigService ragStoreModelConfigService;
@Override
public IngestionRunVO getRun(Long storeId, Long documentId) {
log.info("文件解析管道聚合开始storeId={}, documentId={}", storeId, documentId);
if (storeId == null || documentId == null) {
throw new IllegalArgumentException("知识库ID和文档ID不能为空");
}
RagStoreResponse store = ragStoreService.getResponseById(storeId);
if (store == null) {
throw new IllegalArgumentException("知识库不存在ID: " + storeId);
}
RagDocumentResponse document = ragDocumentService.getResponseById(documentId);
if (document == null || !storeId.equals(document.getStoreId())) {
throw new IllegalArgumentException("文档不存在或不属于当前知识库documentId: " + documentId);
}
RagDocumentParseResult parseResult = ragDocumentParseResultService.getByDocumentId(documentId);
List<RagChunk> chunks = ragChunkService.list(new LambdaQueryWrapper<RagChunk>()
.eq(RagChunk::getDocumentId, documentId)
.eq(RagChunk::getEnabled, true)
.orderByAsc(RagChunk::getChunkIndex));
List<RagChunkEmbedding> embeddings = ragChunkEmbeddingService.list(new LambdaQueryWrapper<RagChunkEmbedding>()
.eq(RagChunkEmbedding::getDocumentId, documentId)
.eq(RagChunkEmbedding::getEnabled, true)
.orderByDesc(RagChunkEmbedding::getCreateTime));
RagStoreModelConfigResponse config = ragStoreModelConfigService.getByStoreId(storeId);
IngestionRunVO view = new IngestionRunVO();
view.setStoreId(storeId);
view.setDocumentId(documentId);
view.setStoreCode(store.getStoreCode());
view.setStoreName(store.getStoreName());
view.setFiles(List.of(toFileVO(document)));
view.setSteps(buildSteps(document, parseResult, chunks, embeddings, config));
view.setParsedTextPreview(buildParsedPreview(parseResult));
view.setChunkPreview(buildChunkPreview(chunks, config));
view.setLogs(buildLogs(document, parseResult, chunks, embeddings));
if (config != null) {
view.setChunkStrategy(config.getChunkStrategy());
view.setChunkSize(config.getChunkSize());
view.setChunkOverlap(config.getChunkOverlap());
view.setEmbeddingModelId(config.getEmbeddingModelId());
view.setEmbeddingDimension(config.getEmbeddingDimension());
}
log.info("文件解析管道聚合结束storeId={}, documentId={}, chunkCount={}, embeddingCount={}",
storeId, documentId, chunks.size(), embeddings.size());
return view;
}
private IngestionRunFileVO toFileVO(RagDocumentResponse document) {
IngestionRunFileVO file = new IngestionRunFileVO();
file.setDocumentId(document.getId());
file.setAttachmentId(document.getAttachmentId());
file.setFileName(document.getDocumentTitle());
file.setParseStatus(document.getParseStatus());
file.setIndexStatus(document.getIndexStatus());
file.setErrorMessage(document.getErrorMessage());
return file;
}
private List<IngestionRunStepVO> buildSteps(RagDocumentResponse document,
RagDocumentParseResult parseResult,
List<RagChunk> chunks,
List<RagChunkEmbedding> embeddings,
RagStoreModelConfigResponse config) {
List<IngestionRunStepVO> steps = new ArrayList<>();
steps.add(step("上传", "文件已入库并创建 rag_document", STEP_DONE));
steps.add(step("解析", parseDescription(parseResult), parseStepStatus(document)));
steps.add(step("切片", chunkDescription(chunks, config), chunkStepStatus(document, chunks)));
steps.add(step("向量化", embeddingDescription(embeddings, config), embeddingStepStatus(document, chunks, embeddings)));
steps.add(step("可检索", retrievableDescription(document), retrievableStepStatus(document)));
return steps;
}
private List<IngestionRunLogVO> buildLogs(RagDocumentResponse document,
RagDocumentParseResult parseResult,
List<RagChunk> chunks,
List<RagChunkEmbedding> embeddings) {
List<IngestionRunLogVO> logs = new ArrayList<>();
logs.add(log(document.getCreateTime(), "INFO", "上传文件并创建 rag_document 记录"));
if (parseResult != null) {
logs.add(log(parseResult.getCreateTime(), "INFO",
"解析完成,文本长度 " + safeNumber(parseResult.getTextLength()) + ",页数 " + safeNumber(parseResult.getPageCount())));
} else if (PARSE_STATUS_FAILED.equals(document.getParseStatus())) {
logs.add(log(document.getUpdateTime(), "WARN", defaultText(document.getErrorMessage(), "解析失败,等待重试")));
} else {
logs.add(log(document.getUpdateTime(), "INFO", "解析尚未产出快照,等待执行"));
}
if (!chunks.isEmpty()) {
logs.add(log(chunks.get(chunks.size() - 1).getCreateTime(), "INFO", "切片完成,共生成 " + chunks.size() + " 个 rag_chunk"));
} else {
logs.add(log(document.getUpdateTime(), "INFO", "切片尚未执行或等待解析成功"));
}
if (!embeddings.isEmpty()) {
RagChunkEmbedding latestEmbedding = embeddings.get(0);
logs.add(log(latestEmbedding.getCreateTime(), "INFO",
"向量化完成,已写入 " + embeddings.size() + " 条 rag_chunk_embedding"));
} else {
logs.add(log(document.getUpdateTime(), "INFO", "向量化尚未执行或等待切片完成"));
}
return logs;
}
private IngestionRunStepVO step(String name, String description, String status) {
IngestionRunStepVO step = new IngestionRunStepVO();
step.setName(name);
step.setDescription(description);
step.setStatus(status);
return step;
}
private IngestionRunLogVO log(Date time, String level, String message) {
IngestionRunLogVO log = new IngestionRunLogVO();
log.setTime(formatTime(time));
log.setLevel(level);
log.setMessage(message);
return log;
}
private String parseDescription(RagDocumentParseResult parseResult) {
if (parseResult == null) {
return "等待解析快照写入 rag_document_parse_result";
}
return "解析完成,文本长度 " + safeNumber(parseResult.getTextLength()) + ",页数 " + safeNumber(parseResult.getPageCount());
}
private String chunkDescription(List<RagChunk> chunks, RagStoreModelConfigResponse config) {
if (chunks.isEmpty()) {
return "等待根据策略生成 rag_chunk";
}
return "已生成 " + chunks.size() + " 个切片chunk_size=" + safeNumber(config == null ? null : config.getChunkSize())
+ "overlap=" + safeNumber(config == null ? null : config.getChunkOverlap());
}
private String embeddingDescription(List<RagChunkEmbedding> embeddings, RagStoreModelConfigResponse config) {
if (embeddings.isEmpty()) {
return "等待向量化并写入 rag_chunk_embedding";
}
String modelText = config == null || config.getEmbeddingModelId() == null ? "-" : String.valueOf(config.getEmbeddingModelId());
return "已生成 " + embeddings.size() + " 条向量,模型 " + modelText + ",维度 "
+ safeNumber(config == null ? null : config.getEmbeddingDimension());
}
private String retrievableDescription(RagDocumentResponse document) {
if (INDEX_STATUS_INDEXED.equals(document.getIndexStatus())) {
return "索引已完成,当前文档可参与检索召回";
}
return "等待索引完成后进入可检索状态";
}
private String parseStepStatus(RagDocumentResponse document) {
if (PARSE_STATUS_PARSED.equals(document.getParseStatus())) {
return STEP_DONE;
}
if (PARSE_STATUS_FAILED.equals(document.getParseStatus())) {
return STEP_BLOCKED;
}
return STEP_RUNNING;
}
private String chunkStepStatus(RagDocumentResponse document, List<RagChunk> chunks) {
if (!chunks.isEmpty()) {
return STEP_DONE;
}
if (PARSE_STATUS_FAILED.equals(document.getParseStatus())) {
return STEP_IDLE;
}
return PARSE_STATUS_PARSED.equals(document.getParseStatus()) ? STEP_RUNNING : STEP_IDLE;
}
private String embeddingStepStatus(RagDocumentResponse document, List<RagChunk> chunks, List<RagChunkEmbedding> embeddings) {
if (!embeddings.isEmpty()) {
return STEP_DONE;
}
if (PARSE_STATUS_FAILED.equals(document.getParseStatus())) {
return STEP_IDLE;
}
return chunks.isEmpty() ? STEP_IDLE : STEP_RUNNING;
}
private String retrievableStepStatus(RagDocumentResponse document) {
return INDEX_STATUS_INDEXED.equals(document.getIndexStatus()) ? STEP_DONE : STEP_IDLE;
}
private String buildParsedPreview(RagDocumentParseResult parseResult) {
if (parseResult == null || parseResult.getParsedText() == null) {
return "暂无解析文本预览";
}
return truncate(parseResult.getParsedText(), 180);
}
private String buildChunkPreview(List<RagChunk> chunks, RagStoreModelConfigResponse config) {
if (chunks.isEmpty()) {
return "暂无切片预览";
}
RagChunk firstChunk = chunks.get(0);
return "chunk_size=" + safeNumber(config == null ? null : config.getChunkSize())
+ ", overlap=" + safeNumber(config == null ? null : config.getChunkOverlap())
+ ", strategy=" + safeNumber(config == null ? null : config.getChunkStrategy())
+ "。预览:" + truncate(defaultText(firstChunk.getChunkSummary(), firstChunk.getChunkContent()), 140);
}
private String truncate(String text, int limit) {
if (text == null || text.length() <= limit) {
return text;
}
return text.substring(0, limit) + "...";
}
private String defaultText(String preferred, String fallback) {
return preferred == null || preferred.isBlank() ? fallback : preferred;
}
private String safeNumber(Number value) {
return value == null ? "-" : String.valueOf(value);
}
private String formatTime(Date time) {
Date value = time == null ? new Date() : time;
return new SimpleDateFormat("HH:mm:ss", Locale.CHINA).format(value);
}
}

View File

@@ -0,0 +1,34 @@
package com.bruce.rag.vo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 摄取流水线文件摘要。
*/
@Data
@Schema(description = "摄取流水线文件摘要")
public class IngestionRunFileVO {
@Schema(description = "文档ID")
@JsonSerialize(using = ToStringSerializer.class)
private Long documentId;
@Schema(description = "附件ID")
@JsonSerialize(using = ToStringSerializer.class)
private Long attachmentId;
@Schema(description = "文件名称")
private String fileName;
@Schema(description = "解析状态")
private String parseStatus;
@Schema(description = "索引状态")
private String indexStatus;
@Schema(description = "错误信息")
private String errorMessage;
}

View File

@@ -0,0 +1,21 @@
package com.bruce.rag.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 摄取流水线日志视图。
*/
@Data
@Schema(description = "摄取流水线日志视图")
public class IngestionRunLogVO {
@Schema(description = "日志时间")
private String time;
@Schema(description = "日志级别")
private String level;
@Schema(description = "日志内容")
private String message;
}

View File

@@ -0,0 +1,21 @@
package com.bruce.rag.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 摄取流水线阶段视图。
*/
@Data
@Schema(description = "摄取流水线阶段视图")
public class IngestionRunStepVO {
@Schema(description = "阶段名称")
private String name;
@Schema(description = "阶段说明")
private String description;
@Schema(description = "阶段状态")
private String status;
}

View File

@@ -0,0 +1,65 @@
package com.bruce.rag.vo;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 文件解析管道聚合视图。
*/
@Data
@Schema(description = "文件解析管道聚合视图")
public class IngestionRunVO {
@Schema(description = "运行标识")
private String runId;
@Schema(description = "知识库ID")
@JsonSerialize(using = ToStringSerializer.class)
private Long storeId;
@Schema(description = "文档ID")
@JsonSerialize(using = ToStringSerializer.class)
private Long documentId;
@Schema(description = "知识库编码")
private String storeCode;
@Schema(description = "知识库名称")
private String storeName;
@Schema(description = "文件摘要列表")
private List<IngestionRunFileVO> files = new ArrayList<>();
@Schema(description = "流水线阶段")
private List<IngestionRunStepVO> steps = new ArrayList<>();
@Schema(description = "解析文本预览")
private String parsedTextPreview;
@Schema(description = "切片预览")
private String chunkPreview;
@Schema(description = "切片策略")
private Integer chunkStrategy;
@Schema(description = "切片长度")
private Integer chunkSize;
@Schema(description = "切片重叠")
private Integer chunkOverlap;
@Schema(description = "Embedding 模型ID")
@JsonSerialize(using = ToStringSerializer.class)
private Long embeddingModelId;
@Schema(description = "Embedding 维度")
private Integer embeddingDimension;
@Schema(description = "任务日志")
private List<IngestionRunLogVO> logs = new ArrayList<>();
}

View File

@@ -0,0 +1,129 @@
package com.bruce.rag.ingestion;
import com.bruce.modelprovider.dto.response.RagStoreModelConfigResponse;
import com.bruce.modelprovider.service.IRagStoreModelConfigService;
import com.bruce.rag.dto.response.RagDocumentResponse;
import com.bruce.rag.dto.response.RagStoreResponse;
import com.bruce.rag.entity.RagChunk;
import com.bruce.rag.entity.RagChunkEmbedding;
import com.bruce.rag.entity.RagDocumentParseResult;
import com.bruce.rag.service.IRagChunkEmbeddingService;
import com.bruce.rag.service.IRagChunkService;
import com.bruce.rag.service.IRagDocumentParseResultService;
import com.bruce.rag.service.IRagDocumentService;
import com.bruce.rag.service.IRagStoreService;
import com.bruce.rag.service.impl.IngestionRunServiceImpl;
import com.bruce.rag.vo.IngestionRunVO;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Date;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class IngestionRunServiceTests {
@Mock
private IRagStoreService ragStoreService;
@Mock
private IRagDocumentService ragDocumentService;
@Mock
private IRagDocumentParseResultService ragDocumentParseResultService;
@Mock
private IRagChunkService ragChunkService;
@Mock
private IRagChunkEmbeddingService ragChunkEmbeddingService;
@Mock
private IRagStoreModelConfigService ragStoreModelConfigService;
@InjectMocks
private IngestionRunServiceImpl ingestionRunService;
@Test
void getRunShouldAggregatePipelinePreviewAndLogs() {
RagStoreResponse store = new RagStoreResponse();
store.setId(1001L);
store.setStoreCode("PROD_DOC");
store.setStoreName("产品制度库");
RagDocumentResponse document = new RagDocumentResponse();
document.setId(11L);
document.setStoreId(1001L);
document.setAttachmentId(901L);
document.setDocumentTitle("售前方案模板.pdf");
document.setParseStatus("PARSED");
document.setIndexStatus("INDEXED");
document.setCreateTime(new Date(1748780000000L));
document.setUpdateTime(new Date(1748780300000L));
RagDocumentParseResult parseResult = new RagDocumentParseResult();
parseResult.setDocumentId(11L);
parseResult.setParsedText("私有化部署章节应覆盖基础设施、网络、安全与运维边界。平台需说明模型服务商、知识库索引策略与日志留存周期。");
parseResult.setTextLength(1280);
parseResult.setPageCount(12);
parseResult.setCreateTime(new Date(1748780100000L));
RagChunk chunk = new RagChunk();
chunk.setDocumentId(11L);
chunk.setChunkIndex(24);
chunk.setChunkSummary("chunk_size=800, overlap=120, strategy=FIXED_LENGTH");
chunk.setChunkContent("该切片将进入 rag_chunk 并在向量化后写入 rag_chunk_embedding。");
chunk.setCreateTime(new Date(1748780200000L));
RagChunkEmbedding embedding = new RagChunkEmbedding();
embedding.setDocumentId(11L);
embedding.setEmbeddingDimension(1024);
embedding.setCreateTime(new Date(1748780250000L));
RagStoreModelConfigResponse config = new RagStoreModelConfigResponse();
config.setStoreId(1001L);
config.setEmbeddingModelId(88L);
config.setEmbeddingDimension(1024);
config.setChunkStrategy(1);
config.setChunkSize(800);
config.setChunkOverlap(120);
when(ragStoreService.getResponseById(1001L)).thenReturn(store);
when(ragDocumentService.getResponseById(11L)).thenReturn(document);
when(ragDocumentParseResultService.getByDocumentId(11L)).thenReturn(parseResult);
when(ragChunkService.list(org.mockito.ArgumentMatchers.<com.baomidou.mybatisplus.core.conditions.Wrapper<RagChunk>>any()))
.thenReturn(List.of(chunk));
when(ragChunkEmbeddingService.list(org.mockito.ArgumentMatchers.<com.baomidou.mybatisplus.core.conditions.Wrapper<RagChunkEmbedding>>any()))
.thenReturn(List.of(embedding));
when(ragStoreModelConfigService.getByStoreId(1001L)).thenReturn(config);
IngestionRunVO view = ingestionRunService.getRun(1001L, 11L);
assertNotNull(view);
assertEquals(1001L, view.getStoreId());
assertEquals(11L, view.getDocumentId());
assertEquals("PROD_DOC", view.getStoreCode());
assertEquals("产品制度库", view.getStoreName());
assertEquals(1, view.getFiles().size());
assertEquals(5, view.getSteps().size());
assertEquals("done", view.getSteps().get(1).getStatus());
assertEquals("done", view.getSteps().get(2).getStatus());
assertEquals("done", view.getSteps().get(3).getStatus());
assertEquals("done", view.getSteps().get(4).getStatus());
assertEquals(88L, view.getEmbeddingModelId());
assertEquals(1024, view.getEmbeddingDimension());
assertEquals(800, view.getChunkSize());
assertEquals(120, view.getChunkOverlap());
assertEquals(4, view.getLogs().size());
assertEquals("售前方案模板.pdf", view.getFiles().get(0).getFileName());
assertEquals(true, view.getParsedTextPreview().contains("私有化部署章节"));
assertEquals(true, view.getChunkPreview().contains("chunk_size=800"));
}
}