feat(rag): 打通自动解析与手动异步切片链路
This commit is contained in:
@@ -2,8 +2,10 @@ package com.bruce;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableAsync
|
||||
public class CommonAgentApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.bruce.rag.controller;
|
||||
|
||||
import com.bruce.common.domain.model.RequestResult;
|
||||
import com.bruce.rag.dto.request.RagDocumentBatchUploadRequest;
|
||||
import com.bruce.rag.dto.request.RagDocumentChunkRequest;
|
||||
import com.bruce.rag.dto.request.RagDocumentParseRequest;
|
||||
import com.bruce.rag.dto.request.RagDocumentQueryRequest;
|
||||
import com.bruce.rag.dto.request.RagDocumentSaveRequest;
|
||||
import com.bruce.rag.dto.response.RagDocumentParseResponse;
|
||||
import com.bruce.rag.dto.response.RagDocumentResponse;
|
||||
import com.bruce.rag.service.IRagDocumentParseService;
|
||||
import com.bruce.rag.service.IRagDocumentChunkService;
|
||||
import com.bruce.rag.service.IRagDocumentService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
@@ -35,6 +37,9 @@ public class RagDocumentController {
|
||||
@Autowired
|
||||
private IRagDocumentParseService ragDocumentParseService;
|
||||
|
||||
@Autowired
|
||||
private IRagDocumentChunkService ragDocumentChunkService;
|
||||
|
||||
@Operation(summary = "查询全部知识库文档")
|
||||
@PostMapping("/list")
|
||||
public RequestResult<List<RagDocumentResponse>> list() {
|
||||
@@ -100,4 +105,13 @@ public class RagDocumentController {
|
||||
log.info("RagDocumentController.parse success, count={}", responses.size());
|
||||
return RequestResult.success(responses);
|
||||
}
|
||||
|
||||
@Operation(summary = "按策略异步切片")
|
||||
@PostMapping("/chunk")
|
||||
public RequestResult<Boolean> chunk(@RequestBody RagDocumentChunkRequest request) {
|
||||
log.info("RagDocumentController.chunk start, request={}", request);
|
||||
ragDocumentChunkService.submitChunkTask(request);
|
||||
log.info("RagDocumentController.chunk submitted");
|
||||
return RequestResult.success(Boolean.TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.bruce.rag.dto.request;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Schema(description = "RAG知识库文档切片请求")
|
||||
public class RagDocumentChunkRequest {
|
||||
|
||||
@Schema(description = "文档ID列表")
|
||||
private List<Long> documentIds;
|
||||
|
||||
@Schema(description = "切片方式枚举值")
|
||||
private Integer chunkStrategy;
|
||||
|
||||
@Schema(description = "切片长度")
|
||||
private Integer chunkSize;
|
||||
|
||||
@Schema(description = "重叠长度")
|
||||
private Integer chunkOverlap;
|
||||
|
||||
@Schema(description = "分隔符")
|
||||
private String delimiter;
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.bruce.rag.event;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 文档上传完成事件,用于异步触发自动解析。
|
||||
*/
|
||||
public record RagDocumentUploadedEvent(List<Long> documentIds) {
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.bruce.rag.event;
|
||||
|
||||
import com.bruce.rag.service.IRagDocumentParseService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RagDocumentUploadedEventListener {
|
||||
|
||||
private final IRagDocumentParseService ragDocumentParseService;
|
||||
|
||||
@Async
|
||||
@EventListener
|
||||
public void onUploaded(RagDocumentUploadedEvent event) {
|
||||
if (event == null || event.documentIds() == null) {
|
||||
return;
|
||||
}
|
||||
for (Long documentId : event.documentIds()) {
|
||||
try {
|
||||
ragDocumentParseService.parse(documentId);
|
||||
} catch (RuntimeException e) {
|
||||
log.warn("RagDocumentUploadedEventListener.onUploaded parse failed, documentId={}, message={}",
|
||||
documentId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.bruce.rag.service;
|
||||
|
||||
import com.bruce.rag.dto.request.RagDocumentChunkRequest;
|
||||
|
||||
public interface IRagDocumentChunkService {
|
||||
|
||||
void submitChunkTask(RagDocumentChunkRequest request);
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.bruce.rag.service;
|
||||
|
||||
import com.bruce.common.document.parse.DocumentParseResult;
|
||||
import com.bruce.rag.dto.response.RagDocumentParseResponse;
|
||||
import com.bruce.rag.dto.request.RagDocumentParseRequest;
|
||||
|
||||
@@ -7,6 +8,8 @@ import java.util.List;
|
||||
|
||||
public interface IRagDocumentParseService {
|
||||
|
||||
DocumentParseResult parseDocumentResult(Long documentId);
|
||||
|
||||
RagDocumentParseResponse parse(Long documentId);
|
||||
|
||||
List<RagDocumentParseResponse> parse(RagDocumentParseRequest request);
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.bruce.rag.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.bruce.common.document.parse.DocumentParseResult;
|
||||
import com.bruce.rag.dto.request.RagDocumentChunkRequest;
|
||||
import com.bruce.rag.entity.RagChunk;
|
||||
import com.bruce.rag.entity.RagDocument;
|
||||
import com.bruce.rag.enums.RagChunkStrategyEnum;
|
||||
import com.bruce.rag.parse.Chunker;
|
||||
import com.bruce.rag.parse.ChunkerFactory;
|
||||
import com.bruce.rag.parse.RagChunkCommand;
|
||||
import com.bruce.rag.service.IRagChunkService;
|
||||
import com.bruce.rag.service.IRagDocumentChunkService;
|
||||
import com.bruce.rag.service.IRagDocumentParseService;
|
||||
import com.bruce.rag.service.IRagDocumentService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class RagDocumentChunkServiceImpl implements IRagDocumentChunkService {
|
||||
|
||||
private final IRagDocumentService ragDocumentService;
|
||||
|
||||
private final IRagDocumentParseService ragDocumentParseService;
|
||||
|
||||
private final ChunkerFactory chunkerFactory;
|
||||
|
||||
private final IRagChunkService ragChunkService;
|
||||
|
||||
@Override
|
||||
@Async
|
||||
public void submitChunkTask(RagDocumentChunkRequest request) {
|
||||
validateRequest(request);
|
||||
RagChunkStrategyEnum strategy = RagChunkStrategyEnum.fromValue(request.getChunkStrategy());
|
||||
Chunker chunker = chunkerFactory.resolve(strategy);
|
||||
for (Long documentId : request.getDocumentIds()) {
|
||||
try {
|
||||
RagDocument document = ragDocumentService.getById(documentId);
|
||||
if (document == null) {
|
||||
log.warn("RagDocumentChunkServiceImpl.chunkAsync document not found, documentId={}", documentId);
|
||||
continue;
|
||||
}
|
||||
DocumentParseResult parseResult = ragDocumentParseService.parseDocumentResult(documentId);
|
||||
RagChunkCommand command = new RagChunkCommand();
|
||||
command.setDocument(document);
|
||||
command.setParseResult(parseResult);
|
||||
command.setChunkStrategy(request.getChunkStrategy());
|
||||
command.setChunkSize(request.getChunkSize());
|
||||
command.setChunkOverlap(request.getChunkOverlap());
|
||||
command.setDelimiter(request.getDelimiter());
|
||||
List<RagChunk> chunks = chunker.chunk(command);
|
||||
|
||||
ragChunkService.remove(Wrappers.<RagChunk>lambdaQuery()
|
||||
.eq(RagChunk::getDocumentId, documentId));
|
||||
if (!chunks.isEmpty()) {
|
||||
ragChunkService.saveBatch(chunks);
|
||||
}
|
||||
log.info("RagDocumentChunkServiceImpl.chunkAsync success, documentId={}, chunkCount={}",
|
||||
documentId, chunks.size());
|
||||
} catch (RuntimeException e) {
|
||||
log.warn("RagDocumentChunkServiceImpl.chunkAsync failed, documentId={}, message={}",
|
||||
documentId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRequest(RagDocumentChunkRequest request) {
|
||||
if (request == null) {
|
||||
throw new IllegalArgumentException("切片请求不能为空");
|
||||
}
|
||||
if (request.getDocumentIds() == null || request.getDocumentIds().isEmpty()) {
|
||||
throw new IllegalArgumentException("文档ID列表不能为空");
|
||||
}
|
||||
RagChunkStrategyEnum.fromValue(request.getChunkStrategy());
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,6 @@ import com.bruce.common.service.ISysAttachmentService;
|
||||
import com.bruce.rag.dto.request.RagDocumentParseRequest;
|
||||
import com.bruce.rag.dto.response.RagDocumentParseResponse;
|
||||
import com.bruce.rag.entity.RagDocument;
|
||||
import com.bruce.rag.enums.RagChunkStrategyEnum;
|
||||
import com.bruce.rag.enums.RagParseStatusEnum;
|
||||
import com.bruce.rag.service.IRagDocumentParseService;
|
||||
import com.bruce.rag.service.IRagDocumentService;
|
||||
@@ -48,8 +47,21 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService {
|
||||
return responses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentParseResult parseDocumentResult(Long documentId) {
|
||||
return doParse(documentId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RagDocumentParseResponse parse(Long documentId) {
|
||||
DocumentParseResult result = doParse(documentId);
|
||||
RagDocumentParseResponse response = toResponse(documentId, result);
|
||||
log.info("RagDocumentParseServiceImpl.parse success, documentId={}, textLength={}",
|
||||
documentId, response.getTextLength());
|
||||
return response;
|
||||
}
|
||||
|
||||
private DocumentParseResult doParse(Long documentId) {
|
||||
log.info("RagDocumentParseServiceImpl.parse start, documentId={}", documentId);
|
||||
if (documentId == null) {
|
||||
throw new IllegalArgumentException("文档ID不能为空");
|
||||
@@ -74,10 +86,7 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService {
|
||||
DocumentParser parser = documentParserFactory.resolve(context);
|
||||
DocumentParseResult result = parser.parse(context);
|
||||
updateParseStatus(documentId, RagParseStatusEnum.PARSED, null);
|
||||
RagDocumentParseResponse response = toResponse(documentId, result);
|
||||
log.info("RagDocumentParseServiceImpl.parse success, documentId={}, textLength={}",
|
||||
documentId, response.getTextLength());
|
||||
return response;
|
||||
return result;
|
||||
} catch (RuntimeException e) {
|
||||
updateParseStatus(documentId, RagParseStatusEnum.FAILED, e.getMessage());
|
||||
log.warn("RagDocumentParseServiceImpl.parse failed, documentId={}, message={}", documentId, e.getMessage());
|
||||
@@ -92,7 +101,6 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService {
|
||||
if (request.getDocumentIds() == null || request.getDocumentIds().isEmpty()) {
|
||||
throw new IllegalArgumentException("文档ID列表不能为空");
|
||||
}
|
||||
RagChunkStrategyEnum.fromValue(request.getChunkStrategy());
|
||||
}
|
||||
|
||||
private DocumentParseContext buildParseContext(RagDocument document, SysAttachment attachment) {
|
||||
|
||||
@@ -10,12 +10,14 @@ import com.bruce.rag.dto.request.RagDocumentQueryRequest;
|
||||
import com.bruce.rag.dto.request.RagDocumentSaveRequest;
|
||||
import com.bruce.rag.dto.response.RagDocumentResponse;
|
||||
import com.bruce.rag.entity.RagDocument;
|
||||
import com.bruce.rag.event.RagDocumentUploadedEvent;
|
||||
import com.bruce.rag.enums.RagIndexStatusEnum;
|
||||
import com.bruce.rag.enums.RagParseStatusEnum;
|
||||
import com.bruce.rag.mapper.RagDocumentMapper;
|
||||
import com.bruce.rag.service.IRagDocumentService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -29,6 +31,9 @@ public class RagDocumentServiceImpl extends ServiceImpl<RagDocumentMapper, RagDo
|
||||
@Autowired
|
||||
private ISysAttachmentService sysAttachmentService;
|
||||
|
||||
@Autowired
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@Override
|
||||
public List<RagDocumentResponse> listResponses() {
|
||||
log.info("RagDocumentServiceImpl.listResponses start");
|
||||
@@ -166,6 +171,13 @@ public class RagDocumentServiceImpl extends ServiceImpl<RagDocumentMapper, RagDo
|
||||
results.add(RagDocumentResponse.fromEntity(document));
|
||||
}
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
List<Long> documentIds = results.stream()
|
||||
.map(RagDocumentResponse::getId)
|
||||
.toList();
|
||||
eventPublisher.publishEvent(new RagDocumentUploadedEvent(documentIds));
|
||||
}
|
||||
|
||||
log.info("RagDocumentServiceImpl.batchUpload success, storeId={}, uploaded={}",
|
||||
request.getStoreId(), results.size());
|
||||
return results;
|
||||
|
||||
@@ -10,7 +10,6 @@ import com.bruce.common.service.ISysAttachmentService;
|
||||
import com.bruce.rag.dto.request.RagDocumentParseRequest;
|
||||
import com.bruce.rag.dto.response.RagDocumentParseResponse;
|
||||
import com.bruce.rag.entity.RagDocument;
|
||||
import com.bruce.rag.enums.RagChunkStrategyEnum;
|
||||
import com.bruce.rag.enums.RagParseStatusEnum;
|
||||
import com.bruce.rag.service.IRagDocumentService;
|
||||
import com.bruce.rag.service.impl.RagDocumentParseServiceImpl;
|
||||
@@ -95,7 +94,7 @@ class RagDocumentParseServiceImplTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseShouldSupportBatchRequestAndChunkStrategyStructure() throws Exception {
|
||||
void parseShouldSupportBatchRequest() throws Exception {
|
||||
Path file = tempDir.resolve("rag").resolve("batch.txt");
|
||||
Files.createDirectories(file.getParent());
|
||||
Files.writeString(file, "batch profiles");
|
||||
@@ -123,8 +122,6 @@ class RagDocumentParseServiceImplTests {
|
||||
);
|
||||
RagDocumentParseRequest request = new RagDocumentParseRequest();
|
||||
request.setDocumentIds(List.of(1002L));
|
||||
request.setChunkStrategy(RagChunkStrategyEnum.DELIMITER.getValue());
|
||||
request.setDelimiter("。");
|
||||
|
||||
when(ragDocumentService.getById(1002L)).thenReturn(document);
|
||||
when(sysAttachmentService.getById(3004L)).thenReturn(attachment);
|
||||
@@ -138,7 +135,7 @@ class RagDocumentParseServiceImplTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
void parseShouldRejectUnknownChunkStrategyValue() {
|
||||
void parseShouldRejectEmptyDocumentIds() {
|
||||
AttachmentProperties attachmentProperties = new AttachmentProperties();
|
||||
attachmentProperties.setBasePath(tempDir.toString());
|
||||
RagDocumentParseServiceImpl service = new RagDocumentParseServiceImpl(
|
||||
@@ -148,8 +145,7 @@ class RagDocumentParseServiceImplTests {
|
||||
new DocumentParserFactory(List.of(new FixedDocumentParser("batch profiles")))
|
||||
);
|
||||
RagDocumentParseRequest request = new RagDocumentParseRequest();
|
||||
request.setDocumentIds(List.of(1002L));
|
||||
request.setChunkStrategy(999);
|
||||
request.setDocumentIds(List.of());
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> service.parse(request));
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import org.mockito.Mock;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.mock.web.MockMultipartFile;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
@@ -38,6 +39,9 @@ class RagDocumentServiceImplTests {
|
||||
@Mock
|
||||
private ISysAttachmentService sysAttachmentService;
|
||||
|
||||
@Mock
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@Test
|
||||
void batchUploadShouldUseRagSourceTypeAndStoreIdAsSourceId() {
|
||||
MockMultipartFile file = new MockMultipartFile(
|
||||
|
||||
Reference in New Issue
Block a user