diff --git a/src/main/java/com/bruce/CommonAgentApplication.java b/src/main/java/com/bruce/CommonAgentApplication.java index 76cf7ee..6dc0943 100644 --- a/src/main/java/com/bruce/CommonAgentApplication.java +++ b/src/main/java/com/bruce/CommonAgentApplication.java @@ -2,8 +2,10 @@ package com.bruce; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication +@EnableAsync public class CommonAgentApplication { public static void main(String[] args) { diff --git a/src/main/java/com/bruce/rag/controller/RagDocumentController.java b/src/main/java/com/bruce/rag/controller/RagDocumentController.java index e5cdbb8..f41c1af 100644 --- a/src/main/java/com/bruce/rag/controller/RagDocumentController.java +++ b/src/main/java/com/bruce/rag/controller/RagDocumentController.java @@ -2,12 +2,14 @@ package com.bruce.rag.controller; import com.bruce.common.domain.model.RequestResult; import com.bruce.rag.dto.request.RagDocumentBatchUploadRequest; +import com.bruce.rag.dto.request.RagDocumentChunkRequest; import com.bruce.rag.dto.request.RagDocumentParseRequest; import com.bruce.rag.dto.request.RagDocumentQueryRequest; import com.bruce.rag.dto.request.RagDocumentSaveRequest; import com.bruce.rag.dto.response.RagDocumentParseResponse; import com.bruce.rag.dto.response.RagDocumentResponse; import com.bruce.rag.service.IRagDocumentParseService; +import com.bruce.rag.service.IRagDocumentChunkService; import com.bruce.rag.service.IRagDocumentService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -35,6 +37,9 @@ public class RagDocumentController { @Autowired private IRagDocumentParseService ragDocumentParseService; + @Autowired + private IRagDocumentChunkService ragDocumentChunkService; + @Operation(summary = "查询全部知识库文档") @PostMapping("/list") public RequestResult> list() { @@ -100,4 +105,13 @@ public class RagDocumentController { log.info("RagDocumentController.parse success, count={}", responses.size()); return RequestResult.success(responses); } + + @Operation(summary = "按策略异步切片") + @PostMapping("/chunk") + public RequestResult chunk(@RequestBody RagDocumentChunkRequest request) { + log.info("RagDocumentController.chunk start, request={}", request); + ragDocumentChunkService.submitChunkTask(request); + log.info("RagDocumentController.chunk submitted"); + return RequestResult.success(Boolean.TRUE); + } } diff --git a/src/main/java/com/bruce/rag/dto/request/RagDocumentChunkRequest.java b/src/main/java/com/bruce/rag/dto/request/RagDocumentChunkRequest.java new file mode 100644 index 0000000..7154023 --- /dev/null +++ b/src/main/java/com/bruce/rag/dto/request/RagDocumentChunkRequest.java @@ -0,0 +1,26 @@ +package com.bruce.rag.dto.request; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.util.List; + +@Data +@Schema(description = "RAG知识库文档切片请求") +public class RagDocumentChunkRequest { + + @Schema(description = "文档ID列表") + private List documentIds; + + @Schema(description = "切片方式枚举值") + private Integer chunkStrategy; + + @Schema(description = "切片长度") + private Integer chunkSize; + + @Schema(description = "重叠长度") + private Integer chunkOverlap; + + @Schema(description = "分隔符") + private String delimiter; +} diff --git a/src/main/java/com/bruce/rag/event/RagDocumentUploadedEvent.java b/src/main/java/com/bruce/rag/event/RagDocumentUploadedEvent.java new file mode 100644 index 0000000..7aa9fa9 --- /dev/null +++ b/src/main/java/com/bruce/rag/event/RagDocumentUploadedEvent.java @@ -0,0 +1,9 @@ +package com.bruce.rag.event; + +import java.util.List; + +/** + * 文档上传完成事件,用于异步触发自动解析。 + */ +public record RagDocumentUploadedEvent(List documentIds) { +} diff --git a/src/main/java/com/bruce/rag/event/RagDocumentUploadedEventListener.java b/src/main/java/com/bruce/rag/event/RagDocumentUploadedEventListener.java new file mode 100644 index 0000000..4f46649 --- /dev/null +++ b/src/main/java/com/bruce/rag/event/RagDocumentUploadedEventListener.java @@ -0,0 +1,32 @@ +package com.bruce.rag.event; + +import com.bruce.rag.service.IRagDocumentParseService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class RagDocumentUploadedEventListener { + + private final IRagDocumentParseService ragDocumentParseService; + + @Async + @EventListener + public void onUploaded(RagDocumentUploadedEvent event) { + if (event == null || event.documentIds() == null) { + return; + } + for (Long documentId : event.documentIds()) { + try { + ragDocumentParseService.parse(documentId); + } catch (RuntimeException e) { + log.warn("RagDocumentUploadedEventListener.onUploaded parse failed, documentId={}, message={}", + documentId, e.getMessage()); + } + } + } +} diff --git a/src/main/java/com/bruce/rag/service/IRagDocumentChunkService.java b/src/main/java/com/bruce/rag/service/IRagDocumentChunkService.java new file mode 100644 index 0000000..85594d8 --- /dev/null +++ b/src/main/java/com/bruce/rag/service/IRagDocumentChunkService.java @@ -0,0 +1,8 @@ +package com.bruce.rag.service; + +import com.bruce.rag.dto.request.RagDocumentChunkRequest; + +public interface IRagDocumentChunkService { + + void submitChunkTask(RagDocumentChunkRequest request); +} diff --git a/src/main/java/com/bruce/rag/service/IRagDocumentParseService.java b/src/main/java/com/bruce/rag/service/IRagDocumentParseService.java index 387c161..2760f7d 100644 --- a/src/main/java/com/bruce/rag/service/IRagDocumentParseService.java +++ b/src/main/java/com/bruce/rag/service/IRagDocumentParseService.java @@ -1,5 +1,6 @@ package com.bruce.rag.service; +import com.bruce.common.document.parse.DocumentParseResult; import com.bruce.rag.dto.response.RagDocumentParseResponse; import com.bruce.rag.dto.request.RagDocumentParseRequest; @@ -7,6 +8,8 @@ import java.util.List; public interface IRagDocumentParseService { + DocumentParseResult parseDocumentResult(Long documentId); + RagDocumentParseResponse parse(Long documentId); List parse(RagDocumentParseRequest request); diff --git a/src/main/java/com/bruce/rag/service/impl/RagDocumentChunkServiceImpl.java b/src/main/java/com/bruce/rag/service/impl/RagDocumentChunkServiceImpl.java new file mode 100644 index 0000000..729751f --- /dev/null +++ b/src/main/java/com/bruce/rag/service/impl/RagDocumentChunkServiceImpl.java @@ -0,0 +1,83 @@ +package com.bruce.rag.service.impl; + +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.bruce.common.document.parse.DocumentParseResult; +import com.bruce.rag.dto.request.RagDocumentChunkRequest; +import com.bruce.rag.entity.RagChunk; +import com.bruce.rag.entity.RagDocument; +import com.bruce.rag.enums.RagChunkStrategyEnum; +import com.bruce.rag.parse.Chunker; +import com.bruce.rag.parse.ChunkerFactory; +import com.bruce.rag.parse.RagChunkCommand; +import com.bruce.rag.service.IRagChunkService; +import com.bruce.rag.service.IRagDocumentChunkService; +import com.bruce.rag.service.IRagDocumentParseService; +import com.bruce.rag.service.IRagDocumentService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RagDocumentChunkServiceImpl implements IRagDocumentChunkService { + + private final IRagDocumentService ragDocumentService; + + private final IRagDocumentParseService ragDocumentParseService; + + private final ChunkerFactory chunkerFactory; + + private final IRagChunkService ragChunkService; + + @Override + @Async + public void submitChunkTask(RagDocumentChunkRequest request) { + validateRequest(request); + RagChunkStrategyEnum strategy = RagChunkStrategyEnum.fromValue(request.getChunkStrategy()); + Chunker chunker = chunkerFactory.resolve(strategy); + for (Long documentId : request.getDocumentIds()) { + try { + RagDocument document = ragDocumentService.getById(documentId); + if (document == null) { + log.warn("RagDocumentChunkServiceImpl.chunkAsync document not found, documentId={}", documentId); + continue; + } + DocumentParseResult parseResult = ragDocumentParseService.parseDocumentResult(documentId); + RagChunkCommand command = new RagChunkCommand(); + command.setDocument(document); + command.setParseResult(parseResult); + command.setChunkStrategy(request.getChunkStrategy()); + command.setChunkSize(request.getChunkSize()); + command.setChunkOverlap(request.getChunkOverlap()); + command.setDelimiter(request.getDelimiter()); + List chunks = chunker.chunk(command); + + ragChunkService.remove(Wrappers.lambdaQuery() + .eq(RagChunk::getDocumentId, documentId)); + if (!chunks.isEmpty()) { + ragChunkService.saveBatch(chunks); + } + log.info("RagDocumentChunkServiceImpl.chunkAsync success, documentId={}, chunkCount={}", + documentId, chunks.size()); + } catch (RuntimeException e) { + log.warn("RagDocumentChunkServiceImpl.chunkAsync failed, documentId={}, message={}", + documentId, e.getMessage()); + } + } + } + + private void validateRequest(RagDocumentChunkRequest request) { + if (request == null) { + throw new IllegalArgumentException("切片请求不能为空"); + } + if (request.getDocumentIds() == null || request.getDocumentIds().isEmpty()) { + throw new IllegalArgumentException("文档ID列表不能为空"); + } + RagChunkStrategyEnum.fromValue(request.getChunkStrategy()); + } +} diff --git a/src/main/java/com/bruce/rag/service/impl/RagDocumentParseServiceImpl.java b/src/main/java/com/bruce/rag/service/impl/RagDocumentParseServiceImpl.java index a588f9e..82c4867 100644 --- a/src/main/java/com/bruce/rag/service/impl/RagDocumentParseServiceImpl.java +++ b/src/main/java/com/bruce/rag/service/impl/RagDocumentParseServiceImpl.java @@ -11,7 +11,6 @@ import com.bruce.common.service.ISysAttachmentService; import com.bruce.rag.dto.request.RagDocumentParseRequest; import com.bruce.rag.dto.response.RagDocumentParseResponse; import com.bruce.rag.entity.RagDocument; -import com.bruce.rag.enums.RagChunkStrategyEnum; import com.bruce.rag.enums.RagParseStatusEnum; import com.bruce.rag.service.IRagDocumentParseService; import com.bruce.rag.service.IRagDocumentService; @@ -48,8 +47,21 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService { return responses; } + @Override + public DocumentParseResult parseDocumentResult(Long documentId) { + return doParse(documentId); + } + @Override public RagDocumentParseResponse parse(Long documentId) { + DocumentParseResult result = doParse(documentId); + RagDocumentParseResponse response = toResponse(documentId, result); + log.info("RagDocumentParseServiceImpl.parse success, documentId={}, textLength={}", + documentId, response.getTextLength()); + return response; + } + + private DocumentParseResult doParse(Long documentId) { log.info("RagDocumentParseServiceImpl.parse start, documentId={}", documentId); if (documentId == null) { throw new IllegalArgumentException("文档ID不能为空"); @@ -74,10 +86,7 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService { DocumentParser parser = documentParserFactory.resolve(context); DocumentParseResult result = parser.parse(context); updateParseStatus(documentId, RagParseStatusEnum.PARSED, null); - RagDocumentParseResponse response = toResponse(documentId, result); - log.info("RagDocumentParseServiceImpl.parse success, documentId={}, textLength={}", - documentId, response.getTextLength()); - return response; + return result; } catch (RuntimeException e) { updateParseStatus(documentId, RagParseStatusEnum.FAILED, e.getMessage()); log.warn("RagDocumentParseServiceImpl.parse failed, documentId={}, message={}", documentId, e.getMessage()); @@ -92,7 +101,6 @@ public class RagDocumentParseServiceImpl implements IRagDocumentParseService { if (request.getDocumentIds() == null || request.getDocumentIds().isEmpty()) { throw new IllegalArgumentException("文档ID列表不能为空"); } - RagChunkStrategyEnum.fromValue(request.getChunkStrategy()); } private DocumentParseContext buildParseContext(RagDocument document, SysAttachment attachment) { diff --git a/src/main/java/com/bruce/rag/service/impl/RagDocumentServiceImpl.java b/src/main/java/com/bruce/rag/service/impl/RagDocumentServiceImpl.java index 9c36673..45abcd4 100644 --- a/src/main/java/com/bruce/rag/service/impl/RagDocumentServiceImpl.java +++ b/src/main/java/com/bruce/rag/service/impl/RagDocumentServiceImpl.java @@ -10,12 +10,14 @@ import com.bruce.rag.dto.request.RagDocumentQueryRequest; import com.bruce.rag.dto.request.RagDocumentSaveRequest; import com.bruce.rag.dto.response.RagDocumentResponse; import com.bruce.rag.entity.RagDocument; +import com.bruce.rag.event.RagDocumentUploadedEvent; import com.bruce.rag.enums.RagIndexStatusEnum; import com.bruce.rag.enums.RagParseStatusEnum; import com.bruce.rag.mapper.RagDocumentMapper; import com.bruce.rag.service.IRagDocumentService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -29,6 +31,9 @@ public class RagDocumentServiceImpl extends ServiceImpl listResponses() { log.info("RagDocumentServiceImpl.listResponses start"); @@ -166,6 +171,13 @@ public class RagDocumentServiceImpl extends ServiceImpl documentIds = results.stream() + .map(RagDocumentResponse::getId) + .toList(); + eventPublisher.publishEvent(new RagDocumentUploadedEvent(documentIds)); + } + log.info("RagDocumentServiceImpl.batchUpload success, storeId={}, uploaded={}", request.getStoreId(), results.size()); return results; diff --git a/src/test/java/com/bruce/rag/RagDocumentParseServiceImplTests.java b/src/test/java/com/bruce/rag/RagDocumentParseServiceImplTests.java index 50ee560..991f436 100644 --- a/src/test/java/com/bruce/rag/RagDocumentParseServiceImplTests.java +++ b/src/test/java/com/bruce/rag/RagDocumentParseServiceImplTests.java @@ -10,7 +10,6 @@ import com.bruce.common.service.ISysAttachmentService; import com.bruce.rag.dto.request.RagDocumentParseRequest; import com.bruce.rag.dto.response.RagDocumentParseResponse; import com.bruce.rag.entity.RagDocument; -import com.bruce.rag.enums.RagChunkStrategyEnum; import com.bruce.rag.enums.RagParseStatusEnum; import com.bruce.rag.service.IRagDocumentService; import com.bruce.rag.service.impl.RagDocumentParseServiceImpl; @@ -95,7 +94,7 @@ class RagDocumentParseServiceImplTests { } @Test - void parseShouldSupportBatchRequestAndChunkStrategyStructure() throws Exception { + void parseShouldSupportBatchRequest() throws Exception { Path file = tempDir.resolve("rag").resolve("batch.txt"); Files.createDirectories(file.getParent()); Files.writeString(file, "batch profiles"); @@ -123,8 +122,6 @@ class RagDocumentParseServiceImplTests { ); RagDocumentParseRequest request = new RagDocumentParseRequest(); request.setDocumentIds(List.of(1002L)); - request.setChunkStrategy(RagChunkStrategyEnum.DELIMITER.getValue()); - request.setDelimiter("。"); when(ragDocumentService.getById(1002L)).thenReturn(document); when(sysAttachmentService.getById(3004L)).thenReturn(attachment); @@ -138,7 +135,7 @@ class RagDocumentParseServiceImplTests { } @Test - void parseShouldRejectUnknownChunkStrategyValue() { + void parseShouldRejectEmptyDocumentIds() { AttachmentProperties attachmentProperties = new AttachmentProperties(); attachmentProperties.setBasePath(tempDir.toString()); RagDocumentParseServiceImpl service = new RagDocumentParseServiceImpl( @@ -148,8 +145,7 @@ class RagDocumentParseServiceImplTests { new DocumentParserFactory(List.of(new FixedDocumentParser("batch profiles"))) ); RagDocumentParseRequest request = new RagDocumentParseRequest(); - request.setDocumentIds(List.of(1002L)); - request.setChunkStrategy(999); + request.setDocumentIds(List.of()); assertThrows(IllegalArgumentException.class, () -> service.parse(request)); } diff --git a/src/test/java/com/bruce/rag/RagDocumentServiceImplTests.java b/src/test/java/com/bruce/rag/RagDocumentServiceImplTests.java index 4d21850..f56c39c 100644 --- a/src/test/java/com/bruce/rag/RagDocumentServiceImplTests.java +++ b/src/test/java/com/bruce/rag/RagDocumentServiceImplTests.java @@ -18,6 +18,7 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.mock.web.MockMultipartFile; +import org.springframework.context.ApplicationEventPublisher; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -38,6 +39,9 @@ class RagDocumentServiceImplTests { @Mock private ISysAttachmentService sysAttachmentService; + @Mock + private ApplicationEventPublisher eventPublisher; + @Test void batchUploadShouldUseRagSourceTypeAndStoreIdAsSourceId() { MockMultipartFile file = new MockMultipartFile(