当前位置: 首页> 娱乐> 影视 > 黑马点评笔记

黑马点评笔记

时间:2025/7/11 23:51:39来源:https://blog.csdn.net/DanielSYC/article/details/138377740 浏览次数:0次

短信登录

jdk 版本降为11

测试地址:http://localhost:8081/shop-type/list

开启前端(windows):

start nginx.exe

测试地址:http://localhost:8080/

开启 redis(centos7):

# cd /usr/local/bin/
# ./redis-server /etc/redis.conf

在这里插入图片描述

发送验证码

UserController.java

请求方法:post

请求路径:/user/code

请求参数:phone,电话号码

返回值:无

Controller 层:

    @PostMapping("code")public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {// TODO 发送短信验证码并保存验证码return userService.sendCode(phone, session);}

Service 层:

    @Overridepublic Result sendCode(String phone, HttpSession session) {if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误");}String code = RandomUtil.randomNumbers(6);session.setAttribute("code", code);log.debug("code send successfully: {}", code);return Result.ok();}

登录

在这里插入图片描述

Controller 层:

    @PostMapping("/login")public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){// TODO 实现登录功能return userService.login(loginForm, session);}

Service 层:

    @Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误");}Object cacheCode = session.getAttribute("code");String code = loginForm.getCode();if (cacheCode == null || !cacheCode.toString().equals(code)) {return Result.fail("验证码错误");}User user = query().eq("phone", phone).one();if (user == null) {user = createUserWithPhone(phone);}session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));return Result.ok();}

请求拦截器做登录校验:

public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {HttpSession session = request.getSession();	// tomcat 会自动根据 cookie 取出 sessionObject user = session.getAttribute("user");if (user == null) {response.setStatus(401);return false;}UserHolder.saveUser((UserDTO) user);return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}

UserHolder 封装了 ThreadLocal:

public class UserHolder {private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();public static void saveUser(UserDTO user){tl.set(user);}public static UserDTO getUser(){return tl.get();}public static void removeUser(){tl.remove();}
}

使用 UserDTO 保存向前端响应的数据,剔除了 User 实体类中的密码等敏感内容:

@Data
public class UserDTO {private Long id;private String nickName;private String icon;
}

在 MvcConfig 中注册拦截器:

@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/login","/user/code","/blog/hot","/shop/**","/shop-type/**","/voucher/**");}
}

实现 me 接口:

    @GetMapping("/me")public Result me(){UserDTO user = UserHolder.getUser();return Result.ok(user);}

Redis 代替 session

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题

session的替代方案应该满足:数据共享;内存存储;key、value结构。Redis 正好满足

在这里插入图片描述

发送验证码,保存到 redis:

    @Overridepublic Result sendCode(String phone, HttpSession session) {if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误");}String code = RandomUtil.randomNumbers(6);stringRedisTemplate.opsForValue().set("login:code:" + phone, code, 2, TimeUnit.MINUTES);log.debug("code send successfully: {}", code);return Result.ok();}

登录逻辑:

  • 校验手机号
  • 从 redis 和表单中分别获取验证码并校验
  • 根据手机号从数据库查询数据(若不存在则创建)
  • 生成随机 token 作为 key,构造 UserDTO 作为 value(Hash 类型),存入 redis,设置 30 min 过期
  • 将 token 返回给前端
    @Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误");}String cacheCode = stringRedisTemplate.opsForValue().get("login:code:" + phone);String code = loginForm.getCode();if (cacheCode == null || !cacheCode.equals(code)) {return Result.fail("验证码错误");}User user = query().eq("phone", phone).one();if (user == null) {user = createUserWithPhone(phone);}String token = UUID.randomUUID().toString(true);UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));stringRedisTemplate.opsForHash().putAll("login:token:" + token, userMap);stringRedisTemplate.expire("login:token:" + token, 30, TimeUnit.MINUTES);return Result.ok(token);}

在请求拦截器中:

  • 从请求头的 “authorization” 字段获取 token
  • 检查 token 是否为空,若不为空则从 redis 中获取 UserDTO;否则返回错误
  • 将 UserDTO 存入 ThreadLocal
  • 刷新 token 的过期时间为 30 分钟
    @Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {String token = request.getHeader("authorization");if (StrUtil.isBlank(token)) {response.setStatus(401);return false;}Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries("login:token:" + token);if (userMap.isEmpty()) {response.setStatus(401);return false;}UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);UserHolder.saveUser(userDTO);stringRedisTemplate.expire("login:token:" + token, 30, TimeUnit.MINUTES);return true;}

上面的拦截器问题在于,只能对于在拦截路径下的请求更新过期时间,对于不需要拦截的路径,无法更新过期时间,这是不符合常理的,需要再增加一个拦截所有路径的拦截器

RefreshTokenInterceptor 中获取 token,从 redis 读取用户,设置 ThreadLocal,更新过期时间

对于任何异常情况,都不做拦截,直接返回 true

public class RefreshTokenInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {String token = request.getHeader("authorization");if (StrUtil.isBlank(token)) {return true;}Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries("login:token:" + token);if (userMap.isEmpty()) {return true;}UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);UserHolder.saveUser(userDTO);stringRedisTemplate.expire("login:token:" + token, 30, TimeUnit.MINUTES);return true;}
}

LoginInterceptor 中,只需检查 ThreadLocal 中是否存在 User 即可:

public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (UserHolder.getUser() == null) {response.setStatus(401);return false;}return true;}
}

MvcConfig 中注册 2 个拦截器

@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor((new RefreshTokenInterceptor(stringRedisTemplate))).order(0);registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/login","/user/code","/blog/hot","/shop/**","/shop-type/**","/voucher/**").order(1);}
}

JWT 实现登录

在登录时将用户信息编码进 jwt 返回给客户端:

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误");}String cacheCode = stringRedisTemplate.opsForValue().get("login:code:" + phone);String code = loginForm.getCode();if (cacheCode == null || !cacheCode.equals(code)) {return Result.fail("验证码错误");}User user = query().eq("phone", phone).one();if (user == null) {user = createUserWithPhone(phone);}Map<String, Object> claims = new HashMap<>();claims.put("id", user.getId());claims.put("nickName", user.getNickName());claims.put("icon", user.getIcon());String token = JwtUtil.genToken(claims);stringRedisTemplate.opsForValue().set(token, token, 24, TimeUnit.HOURS);return Result.ok(token);
}

在请求拦截器中,校验 token 并刷新过期时间:

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {String token = request.getHeader("Authorization");if (StrUtil.isBlank(token)) {return true;}try {String redisToken = stringRedisTemplate.opsForValue().get(token);if (redisToken == null) {return true;}Map<String, Object> claims = JwtUtil.parseToken(token);UserDTO userDTO = new UserDTO();userDTO.setId(Long.valueOf((Integer)claims.get("id")));userDTO.setNickName((String) claims.get("nickName"));userDTO.setIcon((String) claims.get("icon"));UserHolder.saveUser(userDTO);stringRedisTemplate.expire(token, 24, TimeUnit.HOURS);} catch (Exception e) {e.printStackTrace();return true;}return true;
}

缓存

添加商户缓存

原有的接口直接查询数据库:

    @GetMapping("/{id}")public Result queryShopById(@PathVariable("id") Long id) {return Result.ok(shopService.getById(id));	// mybatis-plus 直接根据 id 查询}

在这里插入图片描述
Controller 层:

    @GetMapping("/{id}")public Result queryShopById(@PathVariable("id") Long id) {return shopService.queryById(id);}

Service 层:

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result queryById(Long id) {// 先查缓存String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);if (StrUtil.isNotBlank(shopJson)) {		// 缓存中存在Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 缓存不存在,查数据库Shop shop = getById(id);if (shop == null) {return Result.fail("商户不存在");}stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSONUtil.toJsonStr(shop));return Result.ok(shop);}
}

添加商户分类缓存

ShopTypeController 类中:

    @GetMapping("list")public Result queryTypeList() {return typeService.listShopType();}

ShopTypeServiceImpl 中:

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result listShopType() {String shopTypeJson = stringRedisTemplate.opsForValue().get("cache:shop-type");if (StrUtil.isNotBlank(shopTypeJson)) {List<ShopType> shoptype = JSONUtil.toList(shopTypeJson, ShopType.class);return Result.ok(shoptype);}List<ShopType> shoptype = query().orderByAsc("sort").list();stringRedisTemplate.opsForValue().set("cache:shop-type", JSONUtil.toJsonStr(shoptype));return Result.ok(shoptype);}
}

缓存更新策略

在这里插入图片描述

缓存和数据库的更新策略:先更新数据库,再删除缓存

在这里插入图片描述

缓存更新策略的最佳实践方案:

  • 低一致性需求:使用Redis自带的内存淘汰机制
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案

读操作:

  • 缓存命中则直接返回
  • 缓存未命中则查询数据库,并写入缓存,设定超时时间

写操作:

  • 先写数据库,然后再删除缓存
  • 要确保数据库与缓存操作的原子性

实现商户缓存与数据库的双写一致

修改ShopController中的业务逻辑,满足下面的需求:

  • 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
  • 根据id修改店铺时,先修改数据库,再删除缓存

Controller 层:

    @PutMappingpublic Result updateShop(@RequestBody Shop shop) {return shopService.update(shop);}

Service 层:先写数据库,再删缓存:

    @Override@Transactional	// 确保更新数据库和删除缓存在同一事务中public Result update(Shop shop) {Long id = shop.getId();if (id == null) {return Result.fail("id 不能为空");}updateById(shop);	// 先写数据库stringRedisTemplate.delete("cache:shop:" + id);	// 再删缓存return Result.ok();}

queryById 中更新缓存时设置过期时间:

stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSONUtil.toJsonStr(shop), 30L, TimeUnit.MINUTES);

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库

缓存空值

采用缓存空值的策略处理缓存穿透:

    @Overridepublic Result queryById(Long id) {String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);if (StrUtil.isNotBlank(shopJson)) {Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}if (shopJson != null) {return Result.fail("商户不存在");}Shop shop = getById(id);if (shop == null) {// 缓存空值stringRedisTemplate.opsForValue().set("cache:shop:" + id, "", 3L, TimeUnit.MINUTES);return Result.fail("商户不存在");}stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSONUtil.toJsonStr(shop), 30L, TimeUnit.MINUTES);return Result.ok(shop);}

BloomFilter

采用布隆过滤器解决缓存穿透:

使用 guava 提供的布隆过滤器,导入依赖:

        <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version></dependency>

在 ShopServiceImpl 的构造函数中,创建布隆过滤器,并将数据库中已经存在的 id 存入布隆过滤器:

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {// ...private BloomFilter<Long> bloomFilter;public ShopServiceImpl() {// 设置布隆过滤器的预期插入数量和期望的误判率int expectedInsertions = 1000;double falsePositiveRate = 0.01;// 创建布隆过滤器,使用Long类型作为元素类型bloomFilter = BloomFilter.create(Funnels.longFunnel(), expectedInsertions, falsePositiveRate);// 将已经存在的 id 添加到布隆过滤器中,方便后续查询时进行过滤List<Long> existingIds = new ArrayList<>();for (int i = 1; i < 15; i++) { 	 // 一共 14 个店铺,手动添加,应该是要查 mybatisplus 的,但是查询不到existingIds.add(Long.valueOf(i));}for (Long id : existingIds) {bloomFilter.put(id);}}//...}

在查询方法中,首选判断布隆过滤器中是否存在:

    @Overridepublic Result queryById(Long id) throws InterruptedException {if (!bloomFilter.mightContain(id)) {return Result.fail("商户不存在(BloomFilter)");}//...}

最后在添加商户信息时,也要添加到布隆过滤器中

缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击

常见的解决方案:

  • 互斥锁
  • 逻辑过期

在这里插入图片描述
在这里插入图片描述

基于互斥锁方式解决缓存击穿问题

    private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}private void unlock(String key) {stringRedisTemplate.delete(key);}@Overridepublic Result queryById(Long id) throws InterruptedException {String shopJson = stringRedisTemplate.opsForValue().get("cache:shop:" + id);if (StrUtil.isNotBlank(shopJson)) {Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}if (shopJson != null) {return Result.fail("商户不存在");}String lockKey = "lock:shop:" + id;Shop shop = null;try {if (!tryLock(lockKey)) {Thread.sleep(50);return queryById(id);}shop = getById(id);if (shop == null) {// 缓存空值stringRedisTemplate.opsForValue().set("cache:shop:" + id, "", 3L, TimeUnit.MINUTES);return Result.fail("商户不存在");}stringRedisTemplate.opsForValue().set("cache:shop:" + id, JSONUtil.toJsonStr(shop), 30L, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {unlock(lockKey);}return Result.ok(shop);}

基于逻辑过期方式解决缓存击穿问题

在这里插入图片描述

存储带有逻辑过期时间的数据:

@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
    public Shop queryWithLogicalExpire(Long id) {String key = "cache:shop:" + id;String shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(shopJson)) {return null;}RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {return shop;} else {String lockKey = "lock:shop:" + id;boolean isLock = tryLock(lockKey);if (isLock) {CACHE_REBUILD_EXECUTOR.submit(() -> {saveShop2Redis(id, 30L);unlock(lockKey);});}return shop;}}private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

缓存工具封装

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Component
@Slf4j
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time, TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);}public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;String json = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isNotBlank(json)) {return JSONUtil.toBean(json, type);}if (json != null) {return null;}R r = dbFallback.apply(id);if (r == null) {stringRedisTemplate.opsForValue().set(key, "", 3L, TimeUnit.MINUTES);return null;}this.set(key, r, time, unit);return r;}private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;String json = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(json)) {return null;}RedisData redisData = JSONUtil.toBean(json, RedisData.class);R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {return r;} else {String lockKey = "lock:shop:" + id;boolean isLock = tryLock(lockKey);if (isLock) {CACHE_REBUILD_EXECUTOR.submit(() -> {try {R res = dbFallback.apply(id);this.setWithLogicalExpire(key, res, time, unit);} catch (Exception e) {throw new RuntimeException(e);} finally {unlock(lockKey);}});}return r;}}private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}private void unlock(String key) {stringRedisTemplate.delete(key);}}

秒杀

全局唯一ID

订单表如果使用数据库自增 id 存在的问题:

  • id 的规律性太明显
  • 受单表数据量的限制

需要一个在分布式系统下用来生成全局唯一ID的工具

在这里插入图片描述

@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);	// 每天一个 key// 3.拼接并返回return timestamp << COUNT_BITS | count;}
}

实现优惠券秒杀的下单功能

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

在这里插入图片描述

实现下单功能:

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束");}if (voucher.getStock() < 1) {return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!success) {return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();Long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}
}

考虑多线程争抢,上面的代码会出现“超卖问题”

使用乐观锁解决超卖问题

乐观锁有 2 种实现方式:

  • 版本号机制
  • CAS 机制

在这里插入图片描述

下面的代码实际上并没有用到乐观锁,只是在操作数据库减库存之前判断库存是否大于0:

    public Result seckillVoucher(Long voucherId) {SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束");}if (voucher.getStock() < 1) {return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0)		// 库存 > 0.update();if (!success) {return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();Long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

一人一单

使用悲观锁处理一人一单问题:

  • 锁住 userid 以减小锁的粒度
  • 获取代理类对象以实现事务
    @Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束");}if (voucher.getStock() < 1) {return Result.fail("库存不足");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("重复购买");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();Long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

上面的锁在集群模式下仍然会出现并发安全问题,因为 synchronized 锁是不能跨 JVM 的

分布式锁

满足分布式系统或集群模式下多进程可见并且互斥的锁

navie 实现

public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {long id = Thread.currentThread().getId();Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, id + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(res);}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
}

应用于“一人一单”:

        Long userId = UserHolder.getUser().getId();SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);boolean isLock = lock.tryLock(50);if (!isLock) {return Result.fail("重复下单 RedisLock");}try {return createVoucherOrder(voucherId);} finally {lock.unlock();}

解决误删

在获取锁时存入线程标示(可以用UUID表示)

在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId();	// 增加 uuid 前缀,防止 threadID 重复Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(res);}@Overridepublic void unlock() {String threadId = ID_PREFIX + Thread.currentThread().getId();String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (id.equals(threadId)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}

Lua 脚本解决多条命令的原子性问题

Redis 调用 Lua 脚本:

eval script numkeys key [key ...] arg [arg ...]
-- resources/unlock.lua
if (redis.call('get', KEYS[1]) == ARGV[1]) thenreturn redis.call('del', KEYS[1])
end
return 0

在这里插入图片描述

public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId();Boolean res = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(res);}@Overridepublic void unlock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

存在以下问题:

在这里插入图片描述

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

引入 pom 依赖:

        <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>

创建配置类:

@Configuration
public class RedisonConfig {@Beanpublic RedissonClient redisonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.93.12:6379").setPassword("syc13140");return Redisson.create(config);}
}

注入并使用:

    @Resourceprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 检查秒杀时间和库存...Long userId = UserHolder.getUser().getId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if (!isLock) {return Result.fail("重复下单 RedisLock");}try {return createVoucherOrder(voucherId);} finally {lock.unlock();}}

Redisson 可重入锁原理

前面自己实现的锁是不可重入的:

在这里插入图片描述
可重入锁的原理:在加锁时判断一下,锁的值是否是自己的线程 id,如果是,可以再次加锁,同时递增加锁的次数。这就需要在锁中存储 2 个信息:线程 id 和加锁次数(锁的重入次数),使用 Hash 结构比较合适

在解锁时,将锁的重入次数 - 1,如果减到了 0,将锁删除;如果非零,则重置锁的有效期

在这里插入图片描述

加锁:

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];if (redis.call("exists", key) == 0) thenredis.call("hset", key, threadId, "1");redis.call("expire", key, releaseTime);return 1;
end;if (redis.call("hexists", key, threadId) == 1) thenredis.call("hincrby", key, threadId, "1");redis.call("expire", key, releaseTime);return 1;
end;
return 0;

解锁:

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];if (redis.call("hexists", key, threadId) == 0) thenreturn nil;
end;local count = redis.call("hincrby", key, threadId, -1);
if (count > 0) thenredis.call("expire", key, releaseTime);return nil;
elseredis.call("del", key);return nil;
end;	

Redisson 分布式锁原理:

可重入:利用 hash 结构记录线程 id 和重入次数

可重试:利用信号量和 pubsub 实现等待、唤醒、获取锁失败的重试机制

超时续约:利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间

Redisson 解决主从同步问题

在所有主节点上都加锁,是谓“联锁”:

在这里插入图片描述

void setup() {RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient2.getLock("order");RLock lock3 = redissonClient3.getLock("order");lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

总结:

1)不可重入Redis分布式锁:
原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题

3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂

Redis 优化秒杀

优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点

第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断

第二个难点是由于我们校验和tomcat下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了

在这里插入图片描述

整体思路:当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作

当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功

在这里插入图片描述

秒杀资格判断

在新增优惠券的同时,将库存信息存入 redis 中:

    @Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 存入 redisstringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());}

使用 lua 脚本完成秒杀资格判断(判断秒杀库存,一人一单):

local vocherId = ARGV[1]
local userId = ARGV[2]local stockKey = "seckill:stock:" .. vocherId   -- string value
local orderKey = "seckill:order:" .. vocherId   -- list-- 库存不足
if (tonumber(redis.call("get", stockKey)) <= 0) thenreturn 1
end-- 用户已存在,重复下单
if (redis.call("sismember", orderKey, userId) == 1) thenreturn 2
endredis.call("incrby", stockKey, -1); -- 扣减库存
redis.call("sadd", orderKey, userId);   -- 将用户加入集合
return 0

在 java 代码中直接调用 lua 脚本:

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {var result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),UserHolder.getUser().getId().toString());int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}long orderId = redisIdWorker.nextId("order");return Result.ok(orderId);}// ...
}

异步下单

seckillVoucher 方法中,创建秒杀订单对象,将其压入阻塞队列:

	private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);@Overridepublic Result seckillVoucher(Long voucherId) {var userId = UserHolder.getUser().getId();var result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}long orderId = redisIdWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);orderTasks.add(voucherOrder);return Result.ok(orderId);}

创建单线程的线程池(异步线程),使用 Spring 提供的 PostConstruct 注解在类初始化后立即提交任务:

    // 单线程线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct  // 在类初始化后立即执行private void init() {SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {var task = orderTasks.take();createVoucherOrder(task);} catch (InterruptedException e) {e.printStackTrace();}}}}

任务的内容是:从阻塞队列中获取任务,然后创建订单。创建订单的逻辑如下:

    @Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Long voucherId = voucherOrder.getVoucherId();boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return;}save(voucherOrder);}

上面的代码存在的问题:

  • 当订单过多,阻塞队列过大,占用内存过多,可能会 OOM
  • 一旦服务宕机,订单数据丢失,用户金额损失

消息队列

在这里插入图片描述
redis 提供了 3 种不同的方式实现消息队列:

  • list
  • pub-sub
  • stream

list 实现消息队列

使用 lpush/rpop 或 rpush/lpop 可以模拟消息队列

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

PubSub 实现消息队列

在这里插入图片描述
基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

Stream

Stream 是 Redis5.0 提供的一种新的数据类型,非常适合作为消息队列

向队列中添加:xadd

在这里插入图片描述
读取消息:xread

在这里插入图片描述
Stream 消息队列读取消息后不会被删除,消息是永久存在的

XREAD COUNT 1 STREAMS s1 0
XREAD COUNT 1 BLOCK 5000 STREAMS s1 0	# BLOCK 0 永久阻塞

在业务开发中,我们可以循环调用 XREAD 阻塞方式来查询最新消息,从而实现阻塞持续监听队列的效果:

while (true) {Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");if (msg == null) {continue;}handleMessage(msg);
}

需要注意的是:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

Stream 消费者组

在这里插入图片描述

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

例如:

XGROUP CREATE s1 g1 0

其他常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:

  • “>”:从下一个未消费的消息开始
  • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
XACK s1 g1 id
while (true) {// 从消息队列中取下一条消息Object msg = redis.execute("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");if (msg == null) {continue;}try {handleMessage(msg);	// 完成后提交 ack} catch (Exception e) {while (true) {// 从 pending list 中重新获取消息Object msg = redis.execute("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0");if (msg == null) {break;	// 获取不到,说明已经被其他线程处理,退出内层循环}try {handleMessage(msg);	// 完成后提交 ack} catch (Exception e) {continue;	// 又出错了,再重新从 pending list 中获取并处理}			}}
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

在这里插入图片描述

实现基于 Stream 的异步秒杀

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

创建消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM

修改秒杀脚本 seckill.lua

local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]local stockKey = 'seckill:stock:' .. voucherId   -- string value
local orderKey = 'seckill:order:' .. voucherId   -- list-- 库存不足
if (tonumber(redis.call('get', stockKey)) <= 0) thenreturn 1
end-- 用户已存在,重复下单
if (redis.call('sismember', orderKey, userId) == 1) thenreturn 2
endredis.call('incrby', stockKey, -1) -- 扣减库存
redis.call('sadd', orderKey, userId)   -- 将用户加入集合redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

修改服务接口:

    @Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 判断购买资格并压入消息队列Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}return Result.ok(orderId);}

开启一个线程任务,尝试获取stream.orders中的消息,完成下单:

    // 单线程线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct  // 在类初始化后立即执行private void init() {SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {List<MapRecord<String, Object, Object>> list= stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));if (list == null || list.isEmpty()) {continue;}MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(Long.valueOf((String)values.get("id")));voucherOrder.setVoucherId(Long.valueOf((String)values.get("voucherId")));voucherOrder.setUserId(Long.valueOf((String)values.get("userId")));createVoucherOrder(voucherOrder);   // FIXME: 这里事务不能生效stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {while (true) {try {List<MapRecord<String, Object, Object>> list= stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));if (list == null || list.isEmpty()) {break;}MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(Long.valueOf((String)values.get("id")));voucherOrder.setVoucherId(Long.valueOf((String)values.get("voucherId")));voucherOrder.setUserId(Long.valueOf((String)values.get("userId")));createVoucherOrder(voucherOrder);   // FIXME: 这里事务不能生效stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e2) {e2.printStackTrace();try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}}}

达人探店

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:

  • tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
  • tb_blog_comments:其他用户对探店笔记的评价

修改 SystemConstants::IMAGE_UPLOAD_DIR 为 nginx 的 html/imgs 目录

实现查看发布探店笔记的接口

在这里插入图片描述

在 Blog 实体类中,已经保存了一些用户信息,并使用 @TableField(exist = false) 表明它并不属于当前实体类对应的 table:

public class Blog implements Serializable {private static final long serialVersionUID = 1L;/*** 主键*/@TableId(value = "id", type = IdType.AUTO)private Long id;/*** 商户id*/private Long shopId;/*** 用户id*/private Long userId;/*** 用户图标*/@TableField(exist = false)private String icon;/*** 用户姓名*/@TableField(exist = false)private String name;// ...
}

BlogController 中处理 get 请求:

    @GetMapping("/{id}")public Result queryBlog(@PathVariable("id") Long id) {return blogService.queryBlogById(id);}

queryHotBlog 的业务逻辑也转移到 IBlogService

    @GetMapping("/hot")public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {return blogService.queryHotBlog(current);}

IBlogService 中实现业务逻辑:

public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Overridepublic Result queryBlogById(Long id) {Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在");}queryBlogUser(blog);return Result.ok(blog);}@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(this::queryBlogUser);return Result.ok(records);}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}

点赞

在这里插入图片描述
原有的点赞功能直接修改数据库,会导致刷赞的情况发生:

    @PutMapping("/like/{id}")public Result likeBlog(@PathVariable("id") Long id) {// 修改点赞数量blogService.update().setSql("liked = liked + 1").eq("id", id).update();return Result.ok();}

改进点赞功能:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

实现步骤:

  • 给Blog类中添加一个isLike字段,标示是否被当前用户点赞
  • 修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
  • 修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
  • 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
// BlogController.java@PutMapping("/like/{id}")public Result likeBlog(@PathVariable("id") Long id) {// 修改点赞数量return blogService.likeBlog(id);}
// BlogServiceImpl.java@Override@Transactionalpublic Result likeBlog(Long id) {Long userId = UserHolder.getUser().getId();String key = "blog:liked:" + id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());if (BooleanUtil.isFalse(isMember)) {boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();if (isSuccess) {stringRedisTemplate.opsForSet().add(key, userId.toString());}} else {boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();stringRedisTemplate.opsForSet().remove(key, userId.toString());}return Result.ok();}@Overridepublic Result queryBlogById(Long id) {Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在");}queryBlogUser(blog);// 在根据 id 查询 blog 详情,以及查询 blog 列表的逻辑中,增加判断 isLike 逻辑String key = "blog:liked:" + id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));return Result.ok(blog);}

点赞排行榜

在这里插入图片描述
需要使用 zset 实现排行榜的功能,改造上述代码

 public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result queryBlogById(Long id) {Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在");}queryBlogUser(blog);isBlogLiked(blog);return Result.ok(blog);}@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(bolg -> {this.queryBlogUser(bolg);this.isBlogLiked(bolg);});return Result.ok(records);}@Overridepublic Result likeBlog(Long id) {Long userId = UserHolder.getUser().getId();String key = "blog:liked:" + id;Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());if (score == null) {boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();if (isSuccess) {stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());}} else {boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();stringRedisTemplate.opsForZSet().remove(key, userId.toString());}return Result.ok();}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}private void isBlogLiked(Blog blog) {Long userId = UserHolder.getUser().getId();if (userId == null) {return;}String key = "blog:liked:" + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(score != null);}
}

增加 likes 接口,获取某个 blog 的前五的点赞用户,根据 timestamp 排序:

    @GetMapping("/likes/{id}")public Result queryBlogLikes(@PathVariable("id") Long id) {return blogService.queryBlogLikes(id);}
    @Overridepublic Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOs = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());Collections.reverse(userDTOs);  // 解决 listByIds 导致的逆序问题return Result.ok(userDTOs);}

好友关注

关注和取关

在这里插入图片描述
在这里插入图片描述
关注:新增记录

取关:删除记录

在 FollowController 中增加路由规则:

public class FollowController {@Resourceprivate IFollowService followService;@PutMapping("/{id}/{isFollow}")public Result follow(@PathVariable("id") Long id, @PathVariable("isFollow") Boolean isFollow) {return followService.follow(id, isFollow);}@GetMapping("/or/not/{id}")public Result isFollow(@PathVariable("id") Long id) {return followService.isFollow(id);}
}

在 FollowServiceImpl 中实现业务逻辑:

public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {@Overridepublic Result follow(Long id, Boolean isFollow) {Long userId = UserHolder.getUser().getId();if (Boolean.TRUE.equals(isFollow)) {Follow follow = new Follow();follow.setId(userId);follow.setFollowUserId(id);save(follow);} else {remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", id));}return Result.ok();}@Overridepublic Result isFollow(Long id) {Long userId = UserHolder.getUser().getId();Integer count = query().eq("user_id", userId).eq("follow_user_id", id).count();return Result.ok(count > 0);}
}

共同关注

先实现在笔记页面点击用户头像,跳转到用户详情页的2个接口:

在这里插入图片描述

// UserController@GetMapping("/{id}")public Result queryUserById(@PathVariable("id") Long userId) {User user = userService.getById(userId);if (user == null) {return Result.ok();}UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);return Result.ok(userDTO);}// BlogController@GetMapping("/of/user")public Result queryBlogByUserId(@RequestParam(value="current", defaultValue = "1") Integer current, @RequestParam("id") Long id) {Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));List<Blog> records = page.getRecords();return Result.ok(records);}

在这里插入图片描述
使用 Redis Set 的 SINTER 求交集实现共同关注的功能,为此,需要修改之前的关注接口,使其存入 Redis

修改原来的 follow 业务,在增减关注列表时增删 Redis:

    @Overridepublic Result follow(Long id, Boolean isFollow) {Long userId = UserHolder.getUser().getId();if (Boolean.TRUE.equals(isFollow)) {Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(id);boolean success = save(follow);if (success) {stringRedisTemplate.opsForSet().add("follower:" + userId, id.toString());}} else {remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", id));if (success) {stringRedisTemplate.opsForSet().remove("follower:" + userId, id.toString());}}return Result.ok();}

增加 common 接口:

    @GetMapping("/common/{id}")public Result common(@PathVariable("id") Long id) {return followService.common(id);}
    @Overridepublic Result common(Long id) {Long userId = UserHolder.getUser().getId();Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follower:" + userId, "follower:" + id);if (intersect == null || intersect.isEmpty()) {return Result.ok(Collections.emptyList());}// 这里 list 查询数据库更好List<UserDTO> userDTOS = intersect.stream().map(uid -> BeanUtil.copyProperties(userService.getById(uid), UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);}

Feed 流

在这里插入图片描述
需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询

在保存 blog 的同时推送到 redis:

    @PostMappingpublic Result saveBlog(@RequestBody Blog blog) {return blogService.saveBlog(blog);}
    @Overridepublic Result saveBlog(Blog blog) {// 获取登录用户UserDTO user = UserHolder.getUser();blog.setUserId(user.getId());// 保存探店博文boolean success = save(blog);if (!success) {return Result.fail("新增笔记失败");}List<Follow> followers = followService.query().eq("follow_user_id", user.getId()).list();// 推送 blogid 给所有粉丝for (Follow follow : followers) {String key = "feed:" + blog.getId();stringRedisTemplate.opsForZSet().add(key, follow.getUserId().toString(), System.currentTimeMillis());}// 返回idreturn Result.ok(blog.getId());}
# ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
ZREVRANGEBYSCORE z1 1000 0 WITHSCORES LIMIT 0 3
ZADD z1 8 m8
ZREVRANGEBYSCORE z1 5 0 WITHSCORES LIMIT 1 3

在这里插入图片描述
滚动分页查询参数:

  • max:当前时间戳|上一次查询到最小时间戳
  • min:0
  • offset:0|在上一次的结果中,与最小值一样的元素个数
  • count:3

实现 queryBlogOfFollow 接口:

    @GetMapping("/of/follow")public Result queryBlogOfFollow(@RequestParam(value = "lastId") Long max, @RequestParam(value="offset", defaultValue = "0") Integer offset) {return blogService.queryBlogOfFollow(max, offset);}
    @Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {Long userId = UserHolder.getUser().getId();String key = "feed:" + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 3);if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0;int os = 1;for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {ids.add(Long.valueOf(typedTuple.getValue()));long time = typedTuple.getScore().longValue();if (minTime == time) {os++;} else {minTime = time;os = 1;}}String idStr = StrUtil.join(",", ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Blog blog : blogs) {queryBlogUser(blog);isBlogLiked(blog);}ScrollResult scrollResult = new ScrollResult();scrollResult.setList(blogs);scrollResult.setMinTime(minTime);scrollResult.setOffset(os);return Result.ok(scrollResult);}

附近商户(略)

用户签到

在这里插入图片描述

BitMap的操作命令有:
SETBIT:向指定位置(offset)存入一个0或1
GETBIT :获取指定位置(offset)的bit值
BITCOUNT :统计BitMap中值为1的bit位的数量
BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
BITOP :将多个BitMap的结果做位运算(与 、或、异或)
BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

实现签到

    @PostMapping("/sign")public Result sign() {return userService.sign();}
    @Overridepublic Result sign() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = "sign:" + userId + keySuffix;int dayOfMounth = now.getDayOfMonth();stringRedisTemplate.opsForValue().setBit(key, dayOfMounth - 1, true);return Result.ok();}

签到统计

在这里插入图片描述

    @GetMapping("/sign/count")public Result signCount(){return userService.signCount();}
    @Overridepublic Result signCount() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = "sign:" + userId + keySuffix;int dayOfMounth = now.getDayOfMonth();List<Long> signResult = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMounth)).valueAt(0));if(signResult == null || signResult.isEmpty()) {return Result.ok(0);}Long num = signResult.get(0);if (num == null || num == 0) {return Result.ok(0);}int cnt = 0;while (true) {if ((num & 1) == 0) {break;} else {cnt++;}num >>>= 1;}return Result.ok(cnt);}

UV 统计

首先我们搞懂两个概念:
UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。

在这里插入图片描述

关键字:黑马点评笔记

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: