当前位置: 首页> 房产> 建筑 > 使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

时间:2025/7/11 1:20:16来源:https://blog.csdn.net/m0_65152767/article/details/141610060 浏览次数:0次

文章目录

  • 1、发送消息 KafkaService
  • 2、生产者 service-album -> AlbumInfoServiceImpl
    • 2.1、新增 saveAlbumInfo()
    • 2.2、更新 updateAlbumInfo()
    • 2.3、删除 removeAlbumInfo()
  • 3、消费者 service-search - > AlbumListener.java

  • 上架:新增专辑到 es
  • 下架:删除专辑
  1. 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
  2. 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
  3. 删除:发送消息给kafka,search通过监听器获取消息es删除数据

1、发送消息 KafkaService

package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键** @param topic 发送消息的主题* @param msg   需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** @param topic 消息主题* @param partition 分区编号* @param key 消息键值* @param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) -> {if (ex != null){// 如果发送过程中出现异常logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时

在这里插入图片描述

2、生产者 service-album -> AlbumInfoServiceImpl

在这里插入图片描述

2.1、新增 saveAlbumInfo()

  • 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatService albumStatService;@Autowiredprivate KafkaService kafkaService;@Transactional(rollbackFor = Exception.class)@Overridepublic void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {// 1.保存专辑信息表AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);// 设置当前用户的idLong userId = AuthContextHolder.getUserId();albumInfo.setUserId(userId == null ? 1 : userId);albumInfo.setTracksForFree(5);albumInfo.setSecondsForFree(30);albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);this.save(albumInfo);// 主键回写获取专辑idLong albumInfoId = albumInfo.getId();// 2.保存专辑标签值表List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumInfoId);this.attributeValueMapper.insert(albumAttributeValue);});}//		new FileInputStream("xxx");//		try {
//			TimeUnit.SECONDS.sleep(3);
//		} catch (InterruptedException e) {
//			throw new RuntimeException(e);
//		}// 3.保存统计信息:专辑状态表// this.saveAlbumStat(albumInfoId);this.albumStatService.saveAlbumStat(albumInfoId);if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());}//		int i = 1/0;}
}

在这里插入图片描述

2.2、更新 updateAlbumInfo()

  • 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);albumInfo.setId(albumId);this.updateById(albumInfo);// 更新专辑标签值表:先删除该专辑所有的标签及值 再去新增this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumId);this.attributeValueMapper.insert(albumAttributeValue);});}if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());} else {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}}
}

在这里插入图片描述

2.3、删除 removeAlbumInfo()

  • 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatMapper albumStatMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void removeAlbumInfo(Long albumId) {this.removeById(albumId);this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}
}

在这里插入图片描述

3、消费者 service-search - > AlbumListener.java

在这里插入图片描述

package com.atguigu.tingshu.search.listener;@Component
public class AlbumListener {@Autowiredprivate AlbumInfoFeignClient albumInfoFeignClient;@Autowiredprivate UserInfoFeignClient userInfoFeignClient;@Autowiredprivate CategoryFeignClient categoryFeignClient;@Autowiredprivate ElasticsearchTemplate elasticsearchTemplate;@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)public void upper(String albumId){if (StringUtils.isBlank(albumId)){return;}// 根据专辑id查询专辑Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");AlbumInfo albumInfo = albumInfoResult.getData();Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();// 把专辑信息中的数据复制到index对象BeanUtils.copyProperties(albumInfo, albumInfoIndex);// 查询主播获取主播信息Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");UserInfoVo userInfoVo = userInfoVoResult.getData();if (userInfoVo != null){albumInfoIndex.setAnnouncerId(userInfoVo.getId());albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());}// 根据三级分类id查询一二三级分类Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");BaseCategoryView baseCategoryView = categoryResult.getData();if (baseCategoryView != null) {albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());}// 查询专辑统计信息
//                Result<AlbumStatVo> albumStatesResult = this.albumInfoFeignClient.getAlbumStates(albumInfo.getId());
//                Assert.notNull(albumStatesResult, "数据导入时,获取专辑统计信息失败!");
//                AlbumStatVo albumStatVo = albumStatesResult.getData();
//                if (albumStatVo != null) {
//                    BeanUtils.copyProperties(albumStatVo, albumInfoIndex);
//                }// 假数据:int playNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setPlayStatNum(playNum);int subscribeNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setSubscribeStatNum(subscribeNum);int buyNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setBuyStatNum(buyNum);int commentNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setCommentStatNum(commentNum);// 热度albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);// 标签Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();if (!CollectionUtils.isEmpty(albumAttributeValues)){// 把List<AlbumAttributeValue> 转化成  List<AttributeValueIndex>albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {AttributeValueIndex attributeValueIndex = new AttributeValueIndex();BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);return attributeValueIndex;}).collect(Collectors.toList()));}this.elasticsearchTemplate.save(albumInfoIndex);}@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)public void lower(String albumId){if (StringUtils.isBlank(albumId)){return;}this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);}
}
关键字:使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: