master:商品发布及定时任务同步到ES调整;

pull/1121/head
liujiang 2025-10-25 22:07:18 +08:00
parent 4ee62f22db
commit 6ba5a6efde
4 changed files with 80 additions and 82 deletions

View File

@ -82,12 +82,6 @@ public class GtAndFhbBizController extends BaseController {
final IPictureService pictureService;
final FsNotice fsNotice;
// TODO 提供导出测试环境数据的接口,不然迁移到生产还要重新来一遍
// TODO 提供导出测试环境数据的接口,不然迁移到生产还要重新来一遍
// TODO 提供导出测试环境数据的接口,不然迁移到生产还要重新来一遍
/**
* step1
*/

View File

@ -10,6 +10,7 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ruoyi.common.constant.CacheConstants;
@ -438,6 +439,9 @@ public class XktTask {
// 先删除所有的商品标签,保证数据唯一性
List<DailyProdTag> existList = this.dailyProdTagMapper.selectList(new LambdaQueryWrapper<DailyProdTag>()
.eq(DailyProdTag::getDelFlag, Constants.UNDELETED));
// 已存在于ES中的标签
Map<Long, List<String>> existProdTagMap = CollectionUtils.isEmpty(existList) ? new HashMap<>()
:existList.stream().collect(Collectors.toMap(DailyProdTag::getStoreProdId, x -> new ArrayList<>(), (s1, s2) -> s2));
if (CollectionUtils.isNotEmpty(existList)) {
this.dailyProdTagMapper.deleteByIds(existList.stream().map(DailyProdTag::getId).collect(Collectors.toList()));
}
@ -474,8 +478,16 @@ public class XktTask {
return;
}
this.dailyProdTagMapper.insert(tagList);
// 最新的待更新的商品标签列表
Map<Long, List<String>> tagMap = tagList.stream().collect(Collectors
.groupingBy(DailyProdTag::getStoreProdId, Collectors.mapping(DailyProdTag::getTag, Collectors.toList())));
existProdTagMap.forEach((storeProdId, tags) -> {
if (!tagMap.containsKey(storeProdId)) {
tagMap.put(storeProdId, tags);
}
});
// 更新商品的标签到ES
this.updateESProdTags(tagList);
this.updateESProdTags(tagMap);
}
/**
@ -794,11 +806,17 @@ public class XktTask {
try {
// 调用bulk方法执行批量更新操作
BulkResponse bulkResponse = esClientWrapper.getEsClient().bulk(e -> e.index(Constants.ES_IDX_PRODUCT_INFO).operations(list));
log.info("bulkResponse.result() = {}", bulkResponse.items());
} catch (IOException | RuntimeException e) {
// 记录日志并抛出或处理异常
log.error("向ES更新档口权重失败商品ID: {}, 错误信息: {}", storeProdList.stream().map(StoreProduct::getId).collect(Collectors.toList()), e.getMessage());
throw e; // 或者做其他补偿处理,比如异步重试
log.info("定时任务,批量更新档口权重到 ES 成功的 id列表: {}", bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList()));
// 有哪些没执行成功的,需要发飞书通知
List<String> successIdList = bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList());
List<String> unExeIdList = storeProdList.stream().map(String::valueOf).filter(x -> !successIdList.contains(x)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(unExeIdList)) {
fsNotice.sendMsg2DefaultChat("定时任务,批量更新档口权重到 ES 失败", "以下storeProdId未执行成功: " + unExeIdList);
}
} catch (Exception e) {
log.error("定时任务,批量更新档口权重到 ES 失败", e);
fsNotice.sendMsg2DefaultChat("全部失败,定时任务批量更新档口权重到 ES 失败",
storeProdList.stream().map(StoreProduct::getId).map(String::valueOf).collect(Collectors.joining(",")));
}
}
@ -922,17 +940,25 @@ public class XktTask {
}
// 执行批量插入
try {
BulkResponse response = esClientWrapper.getEsClient().bulk(b -> b.index(Constants.ES_IDX_PRODUCT_INFO).operations(bulkOperations));
log.info("批量插入到 ES 成功,共处理 {} 条记录", response.items().size());
BulkResponse bulkResponse = esClientWrapper.getEsClient().bulk(b -> b.index(Constants.ES_IDX_PRODUCT_INFO).operations(bulkOperations));
log.info("定时发布商品,批量新增商品到 ES 成功的 id列表: {}", bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList()));
// 有哪些没执行成功的,需要发飞书通知
List<String> successIdList = bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList());
List<String> unExeIdList = unPublicList.stream().map(String::valueOf).filter(x -> !successIdList.contains(x)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(unExeIdList)) {
fsNotice.sendMsg2DefaultChat("定时发布商品,批量新增商品到 ES 失败", "以下storeProdId未执行成功: " + unExeIdList);
}
} catch (Exception e) {
log.error("批量插入到 ES 失败", e);
log.error("定时发布商品,批量新增商品到 ES 失败", e);
fsNotice.sendMsg2DefaultChat("定时发布商品,批量新增商品到 ES 失败",
unPublicList.stream().map(StoreProduct::getId).map(String::valueOf).collect(Collectors.joining(",")));
}
for (StoreProduct product : unPublicList) {
List<String> mainPicUrlList = mainPicMap.get(product.getId());
if (CollUtil.isEmpty(mainPicUrlList)) {
return;
}
this.sync2ImgSearchServer(product.getId(), mainPicUrlList, true);
this.sync2ImgSearchServer(product.getId(), mainPicUrlList);
}
}
@ -1534,26 +1560,35 @@ public class XktTask {
/**
* ES
*
* @param tagList
* @throws IOException
* @param prodTagMap map
*/
private void updateESProdTags(List<DailyProdTag> tagList) throws IOException {
private void updateESProdTags(Map<Long, List<String>> prodTagMap) {
// 构建一个批量数据集合
List<BulkOperation> list = new ArrayList<>();
tagList.stream().collect(Collectors.groupingBy(DailyProdTag::getStoreProdId))
.forEach((storeProdId, tags) -> {
// 构建部分文档更新请求
list.add(new BulkOperation.Builder().update(u -> u
.action(a -> a.doc(new HashMap<String, Object>() {{
put("tags", tags.stream().sorted(Comparator.comparing(x -> x.getType())).map(DailyProdTag::getTag).collect(Collectors.toList()));
}}))
.id(String.valueOf(storeProdId))
.index(Constants.ES_IDX_PRODUCT_INFO))
.build());
});
// 调用bulk方法执行批量更新操作
BulkResponse bulkResponse = esClientWrapper.getEsClient().bulk(e -> e.index(Constants.ES_IDX_PRODUCT_INFO).operations(list));
System.out.println("bulkResponse.items() = " + bulkResponse.items());
prodTagMap.forEach((storeProdId, updateTags) -> {
// 构建部分文档更新请求
list.add(new BulkOperation.Builder().update(u -> u
.action(a -> a.doc(new HashMap<String, Object>() {{
put("tags", updateTags);
}}))
.id(String.valueOf(storeProdId))
.index(Constants.ES_IDX_PRODUCT_INFO))
.build());
});
try {
// 调用bulk方法执行批量更新操作
BulkResponse bulkResponse = esClientWrapper.getEsClient().bulk(e -> e.index(Constants.ES_IDX_PRODUCT_INFO).operations(list));
log.info("批量更新商品标签到 ES 成功的 id列表: {}", bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList()));
// 有哪些没执行成功的,需要发飞书通知
List<String> successIdList = bulkResponse.items().stream().map(BulkResponseItem::id).collect(Collectors.toList());
List<String> unExeIdList = prodTagMap.keySet().stream().map(String::valueOf).filter(x -> !successIdList.contains(x)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(unExeIdList)) {
fsNotice.sendMsg2DefaultChat("定时任务批量更新商品标签到 ES 失败", "以下storeProdId未执行成功: " + unExeIdList);
}
} catch (Exception e) {
log.error("定时任务批量更新标签到 ES 失败", e);
fsNotice.sendMsg2DefaultChat("全部失败,定时任务批量更新商品标签到 ES 失败", prodTagMap.keySet().toString());
}
}
/**
@ -1577,21 +1612,14 @@ public class XktTask {
*
* @param storeProductId
* @param picKeys
* @param async
*/
private void sync2ImgSearchServer(Long storeProductId, List<String> picKeys, boolean async) {
if (async) {
ThreadUtil.execAsync(() -> {
ProductPicSyncResultDTO r =
pictureService.sync2ImgSearchServer(new ProductPicSyncDTO(storeProductId, picKeys));
log.info("商品图片同步至搜图服务器: id: {}, result: {}", storeProductId, JSONUtil.toJsonStr(r));
}
);
} else {
ProductPicSyncResultDTO r =
pictureService.sync2ImgSearchServer(new ProductPicSyncDTO(storeProductId, picKeys));
log.info("商品图片同步至搜图服务器: id: {}, result: {}", storeProductId, JSONUtil.toJsonStr(r));
}
private void sync2ImgSearchServer(Long storeProductId, List<String> picKeys) {
ThreadUtil.execAsync(() -> {
ProductPicSyncResultDTO r =
pictureService.sync2ImgSearchServer(new ProductPicSyncDTO(storeProductId, picKeys));
log.info("商品图片同步至搜图服务器: id: {}, result: {}", storeProductId, JSONUtil.toJsonStr(r));
}
);
}
/**

View File

@ -45,7 +45,6 @@ INSERT INTO `advert` VALUES (45, 1, 'ENYmwRr7T1', 1, 1, 7, 1, 1, 3, 300.00, 3, 4
INSERT INTO `advert` VALUES (46, 1, 'KIMSCC0h48', 1, 1, 8, 1, 1, 1, 300.00, 3, 4, '22:00:00', 5, 123, '840*470', '不超过5.00M', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 22:56:27', '', '2025-08-13 22:56:27');
INSERT INTO `advert` VALUES (47, 1, 'iAHKrJlQSX', 1, 1, 9, 1, 1, 1, 300.00, 3, 4, '22:00:00', 2, 124, '840*470', '不超过5.00M', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 22:57:13', '', '2025-08-13 22:57:13');
INSERT INTO `advert` VALUES (48, 2, '3I8CDuP3mZ', 2, 1, 10, 1, 1, 2, 300.00, 3, 4, '22:00:00', 2, NULL, '', '', 2, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 22:57:41', '', '2025-08-13 22:57:41');
INSERT INTO `advert` VALUES (49, 1, 'xgDyZoXlfh', 1, 1, 11, 1, 1, 1, 300.00, 3, 4, '22:00:00', 1, 125, '840*470', '不超过5.00M', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 22:58:29', '', '2025-08-13 22:58:29');
INSERT INTO `advert` VALUES (50, 1, 'cjHdgozRUw', 20, 1, 12, 1, 1, 2, 450.00, 3, 4, '22:00:00', 20, NULL, '', '', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 23:00:55', '', '2025-08-13 23:00:55');
INSERT INTO `advert` VALUES (51, 1, 'RNsmF98eoy', 1, 1, 13, 1, 1, 1, 300.00, 3, 4, '22:00:00', 1, 126, '840*470', '不超过5.00M', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 23:01:31', '', '2025-08-13 23:01:31');
INSERT INTO `advert` VALUES (52, 1, '1egXOizbf8', 1, 1, 14, 1, 1, 4, 750.00, 15, 4, '22:00:00', 10, NULL, '', '', NULL, NULL, NULL, NULL, NULL, 0, '0', '', '2025-08-13 23:04:07', '', '2025-08-13 23:04:07');

View File

@ -22,13 +22,12 @@ import com.ruoyi.common.constant.HttpStatus;
import com.ruoyi.common.core.page.Page;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.exception.user.CaptchaException;
import com.ruoyi.common.exception.user.CaptchaExpireException;
import com.ruoyi.common.utils.AdValidator;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.HtmlValidator;
import com.ruoyi.common.utils.SecurityUtils;
import com.ruoyi.framework.es.EsClientWrapper;
import com.ruoyi.framework.notice.fs.FsNotice;
import com.ruoyi.framework.oss.OSSClientWrapper;
import com.ruoyi.system.domain.dto.productCategory.ProdCateDTO;
import com.ruoyi.xkt.domain.*;
@ -109,6 +108,7 @@ public class StoreProductServiceImpl implements IStoreProductService {
final UserSubscriptionsMapper userSubMapper;
final OSSClientWrapper ossClient;
final TencentAuthManager tencentAuthManager;
final FsNotice fsNotice;
/**
@ -233,7 +233,6 @@ public class StoreProductServiceImpl implements IStoreProductService {
return count;
}
/**
*
*
@ -691,7 +690,7 @@ public class StoreProductServiceImpl implements IStoreProductService {
/**
*
*
* @param storeId ID
* @param storeId ID
* @param prodArtNum
* @return StoreProdFuzzyResDTO
*/
@ -1098,19 +1097,17 @@ public class StoreProductServiceImpl implements IStoreProductService {
* @param storeProd
* @param storeProdDTO
* @param storeName
* @throws IOException
*/
private void createESDoc(StoreProduct storeProd, StoreProdDTO storeProdDTO, String storeName) throws IOException {
private void createESDoc(StoreProduct storeProd, StoreProdDTO storeProdDTO, String storeName) {
ESProductDTO esProductDTO = this.getESDTO(storeProd, storeProdDTO, storeName);
try {
// 向索引中添加数据
CreateResponse createResponse = esClientWrapper.getEsClient().create(e -> e.index(Constants.ES_IDX_PRODUCT_INFO)
.id(storeProd.getId().toString()).document(esProductDTO));
log.info("createResponse.result() = {}", createResponse.result());
} catch (IOException | RuntimeException e) {
// 记录日志并抛出或处理异常
log.error("向ES写入文档失败商品ID: {}, 错误信息: {}", storeProd.getId(), e.getMessage(), e);
throw e; // 或者做其他补偿处理,比如异步重试
} catch (Exception e) {
fsNotice.sendMsg2DefaultChat("新增商品同步到ES 失败", "storeId:" + storeProd.getStoreId() + " prodArtNum: " + storeProd.getProdArtNum());
throw new ServiceException("新增商品同步到ES 失败storeId:" + storeProd.getStoreId() + " prodArtNum: " + storeProd.getProdArtNum(), HttpStatus.ERROR);
}
}
@ -1120,17 +1117,16 @@ public class StoreProductServiceImpl implements IStoreProductService {
* @param storeProd
* @param updateDTO
* @param storeName
* @throws IOException
*/
private void updateESDoc(StoreProduct storeProd, StoreProdDTO updateDTO, String storeName) throws IOException {
private void updateESDoc(StoreProduct storeProd, StoreProdDTO updateDTO, String storeName) {
ESProductDTO esProductDTO = this.getESDTO(storeProd, updateDTO, storeName);
try {
UpdateResponse<ESProductDTO> updateResponse = esClientWrapper.getEsClient().update(u -> u
.index(Constants.ES_IDX_PRODUCT_INFO).doc(esProductDTO).id(storeProd.getId().toString()), ESProductDTO.class);
log.info("updateResponse.result() = {}", updateResponse.result());
} catch (IOException | RuntimeException e) {
log.error("更新商品[{}]到ES失败: {}", storeProd.getId(), e.getMessage(), e);
throw e; // 或者根据业务需求进行重试、异步补偿等处理
} catch (Exception e) {
fsNotice.sendMsg2DefaultChat("更新商品同步到ES 失败", "storeProdId: " + storeProd.getId());
throw new ServiceException("更新商品同步到ES 失败storeProdId:" + storeProd.getId(), HttpStatus.ERROR);
}
}
@ -1525,24 +1521,5 @@ public class StoreProductServiceImpl implements IStoreProductService {
return cateAttrMap;
}
/**
*
*
* @param code
* @param uuid
* @return
*/
private void validateCaptcha(String code, String uuid) {
String verifyKey = CacheConstants.CAPTCHA_CODE_KEY + uuid;
String captcha = redisCache.getCacheObject(verifyKey);
redisCache.deleteObject(verifyKey);
if (captcha == null) {
throw new CaptchaExpireException();
}
if (!StrUtil.emptyIfNull(code).equalsIgnoreCase(captcha)) {
throw new CaptchaException();
}
}
}