文章目录
- 1、发送消息 KafkaService
- 2、生产者 service-album -> AlbumInfoServiceImpl
- 2.1、新增 saveAlbumInfo()
- 2.2、更新 updateAlbumInfo()
- 2.3、删除 removeAlbumInfo()
- 3、消费者 service-search - > AlbumListener.java
- 新增:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据 - 更新:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据
如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据 - 删除:发送消息给kafka,search通过监听器获取消息es删除数据
1、发送消息 KafkaService
package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMsg(String topic, String msg){this.sendMsg(topic, null, null, msg);}public void sendMsg(String topic, Integer partition, String key, String msg){CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);future.whenCompleteAsync((result, ex) -> {if (ex != null){logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
whenCompleteAsync:异步完成时的处理、当异步操作完成时

2、生产者 service-album -> AlbumInfoServiceImpl

2.1、新增 saveAlbumInfo()
- 新增:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据



@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatService albumStatService;@Autowiredprivate KafkaService kafkaService;@Transactional(rollbackFor = Exception.class)@Overridepublic void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);Long userId = AuthContextHolder.getUserId();albumInfo.setUserId(userId == null ? 1 : userId);albumInfo.setTracksForFree(5);albumInfo.setSecondsForFree(30);albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);this.save(albumInfo);Long albumInfoId = albumInfo.getId();List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumInfoId);this.attributeValueMapper.insert(albumAttributeValue);});}
this.albumStatService.saveAlbumStat(albumInfoId);if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());}}
}

2.2、更新 updateAlbumInfo()
- 更新:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据
如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);albumInfo.setId(albumId);this.updateById(albumInfo);this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumId);this.attributeValueMapper.insert(albumAttributeValue);});}if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());} else {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}}
}

2.3、删除 removeAlbumInfo()
- 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatMapper albumStatMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void removeAlbumInfo(Long albumId) {this.removeById(albumId);this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}
}

3、消费者 service-search - > AlbumListener.java

package com.atguigu.tingshu.search.listener;@Component
public class AlbumListener {@Autowiredprivate AlbumInfoFeignClient albumInfoFeignClient;@Autowiredprivate UserInfoFeignClient userInfoFeignClient;@Autowiredprivate CategoryFeignClient categoryFeignClient;@Autowiredprivate ElasticsearchTemplate elasticsearchTemplate;@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)public void upper(String albumId){if (StringUtils.isBlank(albumId)){return;}Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");AlbumInfo albumInfo = albumInfoResult.getData();Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();BeanUtils.copyProperties(albumInfo, albumInfoIndex);Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");UserInfoVo userInfoVo = userInfoVoResult.getData();if (userInfoVo != null){albumInfoIndex.setAnnouncerId(userInfoVo.getId());albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());}Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");BaseCategoryView baseCategoryView = categoryResult.getData();if (baseCategoryView != null) {albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());}
int playNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setPlayStatNum(playNum);int subscribeNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setSubscribeStatNum(subscribeNum);int buyNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setBuyStatNum(buyNum);int commentNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setCommentStatNum(commentNum);albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();if (!CollectionUtils.isEmpty(albumAttributeValues)){albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {AttributeValueIndex attributeValueIndex = new AttributeValueIndex();BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);return attributeValueIndex;}).collect(Collectors.toList()));}this.elasticsearchTemplate.save(albumInfoIndex);}@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)public void lower(String albumId){if (StringUtils.isBlank(albumId)){return;}this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);}
}