文章目录
- 1.MassMailTask.java 延迟任务bean对象
- 2.MassMailTaskService.java
- 3.RedisUtil.java
1.MassMailTask.java 延迟任务bean对象
package com.sunxiansheng.user.delayQueue;import lombok.Data;import java.util.Date;
@Data
public class MassMailTask {private Long taskId;private Date startTime;}
2.MassMailTaskService.java
package com.sunxiansheng.user.delayQueue;import com.alibaba.fastjson.JSON;
import com.sunxiansheng.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MassMailTaskService {public static final String MASS_TASK_KEY = "massTaskMail";@Resourceprivate RedisUtil redisUtil;public void pushMassMailTaskQueue(MassMailTask massMailTask) {Date startTime = massMailTask.getStartTime();if (startTime == null) {return;}if (startTime.compareTo(new Date()) <= 0) {return;}log.info("定时任务加入队列,massTask:{}", JSON.toJSONString(massMailTask));redisUtil.zAdd(MASS_TASK_KEY, massMailTask.getTaskId(), startTime.getTime());}public Set<Long> poolMassTaskQueue() {Set<Object> set = redisUtil.zRangeByScore(MASS_TASK_KEY, 0, System.currentTimeMillis());if (CollectionUtils.isEmpty(set)) {return Collections.emptySet();}redisUtil.zRemoveBySet(MASS_TASK_KEY, set);return set.stream().map(n -> {String string = n.toString();return Long.parseLong(string);}).collect(Collectors.toSet());}}
3.RedisUtil.java
package com.sunxiansheng.redis.util;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);@Resourceprivate RedisTemplate<String, Object> redisTemplate;private static final String CACHE_KEY_SEPARATOR = ".";public String buildKey(String... strObjs) {return String.join(CACHE_KEY_SEPARATOR, strObjs);}public boolean exists(String key) {return execute(() -> redisTemplate.hasKey(key));}public boolean delete(String key) {return execute(() -> redisTemplate.delete(key));}public void set(String key, Object value) {execute(() -> {redisTemplate.opsForValue().set(key, value);return null;});}public void set(String key, Object value, long timeout, TimeUnit unit) {execute(() -> {redisTemplate.opsForValue().set(key, value, timeout, unit);return null;});}public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {return execute(() -> redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));}public <T> T get(String key, Class<T> clazz) {return execute(() -> castValue(redisTemplate.opsForValue().get(key), clazz));}public void increment(String key, long delta) {execute(() -> {redisTemplate.opsForValue().increment(key, delta);return null;});}public void hPut(String key, String hashKey, Object value) {execute(() -> {redisTemplate.opsForHash().put(key, hashKey, value);return null;});}public <T> T hGet(String key, String hashKey, Class<T> clazz) {return execute(() -> castValue(redisTemplate.opsForHash().get(key, hashKey), clazz));}public Map<Object, Object> hGetAll(String key) {return execute(() -> redisTemplate.opsForHash().entries(key));}public void hDelete(String key, Object... hashKey) {execute(() -> {redisTemplate.opsForHash().delete(key, hashKey);return null;});}public Map<Object, Object> hGetAndDelete(String key) {Map<Object, Object> map = new HashMap<>();try (Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, ScanOptions.NONE)) {while (cursor.hasNext()) {Map.Entry<Object, Object> entry = cursor.next();Object hashKey = entry.getKey();Object hashValue = entry.getValue();map.put(hashKey, hashValue);redisTemplate.opsForHash().delete(key, hashKey);}} catch (Exception e) {logger.error("Redis hGetAndDelete error: key={}", key, e);}return map;}public void lPush(String key, Object value) {execute(() -> {redisTemplate.opsForList().leftPush(key, value);return null;});}public void rPush(String key, Object value) {execute(() -> {redisTemplate.opsForList().rightPush(key, value);return null;});}public <T> T lPop(String key, Class<T> clazz) {return execute(() -> castValue(redisTemplate.opsForList().leftPop(key), clazz));}public <T> T rPop(String key, Class<T> clazz) {return execute(() -> castValue(redisTemplate.opsForList().rightPop(key), clazz));}public List<Object> lRange(String key, long start, long end) {return execute(() -> redisTemplate.opsForList().range(key, start, end));}public void sAdd(String key, Object... values) {execute(() -> {redisTemplate.opsForSet().add(key, values);return null;});}public Set<Object> sMembers(String key) {return execute(() -> redisTemplate.opsForSet().members(key));}public boolean sIsMember(String key, Object value) {return execute(() -> redisTemplate.opsForSet().isMember(key, value));}public Object sPop(String key) {return execute(() -> redisTemplate.opsForSet().pop(key));}public Long sCard(String key) {return execute(() -> redisTemplate.opsForSet().size(key));}public boolean zAdd(String key, Object value, double score) {return execute(() -> redisTemplate.opsForZSet().add(key, value, score));}public Long zCard(String key) {return execute(() -> redisTemplate.opsForZSet().size(key));}public Set<Object> zRange(String key, long start, long end) {return execute(() -> redisTemplate.opsForZSet().range(key, start, end));}public Long zRemove(String key, Object value) {return execute(() -> redisTemplate.opsForZSet().remove(key, value));}public Long zRemoveByList(String key, List<Object> values) {return execute(() -> {Long removedCount = 0L;for (Object value : values) {removedCount += redisTemplate.opsForZSet().remove(key, value);}return removedCount;});}public Long zRemoveBySet(String key, Set<Object> values) {return execute(() -> {Long removedCount = 0L;for (Object value : values) {removedCount += redisTemplate.opsForZSet().remove(key, value);}return removedCount;});}public Double zScore(String key, Object value) {return execute(() -> redisTemplate.opsForZSet().score(key, value));}public Set<Object> zRangeByScore(String key, double start, double end) {return execute(() -> redisTemplate.opsForZSet().rangeByScore(key, start, end));}public Double zIncrementScore(String key, Object value, double score) {return execute(() -> redisTemplate.opsForZSet().incrementScore(key, value, score));}public Long zRank(String key, Object value) {return execute(() -> redisTemplate.opsForZSet().rank(key, value));}public Set<ZSetOperations.TypedTuple<Object>> zRangeWithScores(String key, long start, long end) {return execute(() -> redisTemplate.opsForZSet().rangeWithScores(key, start, end));}public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScores(String key, double min, double max) {return execute(() -> redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max));}public Long zRevRank(String key, Object value) {return execute(() -> redisTemplate.opsForZSet().reverseRank(key, value));}public Long zCount(String key, double min, double max) {return execute(() -> redisTemplate.opsForZSet().count(key, min, max));}public Long zRemoveByScore(String key, double min, double max) {return execute(() -> redisTemplate.opsForZSet().removeRangeByScore(key, min, max));}public Long zRemoveByRank(String key, long start, long end) {return execute(() -> redisTemplate.opsForZSet().removeRange(key, start, end));}private <T> T execute(RedisOperation<T> operation) {try {return operation.execute();} catch (Exception e) {logger.error("Redis operation error", e);return null;}}@FunctionalInterfaceprivate interface RedisOperation<T> {T execute();}public <T> T castValue(Object value, Class<T> clazz) {if (value == null) {return null;}if (clazz == Long.class && value instanceof Integer) {return clazz.cast(((Integer) value).longValue());}return clazz.cast(value);}
}