在Java中实现类似Golang的SingleFlight机制,可以通过以下步骤解决缓存击穿问题。该方案使用ConcurrentHashMap
管理并发请求,并通过CompletableFuture
实现异步结果合并。
实现代码
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;public class SingleFlight<T> {// 保存正在处理的请求,键为请求的key,值为对应的CompletableFutureprivate final ConcurrentHashMap<String, CompletableFuture<T>> ongoingRequests = new ConcurrentHashMap<>();// 可选:自定义线程池执行加载任务(默认使用ForkJoinPool.commonPool())private final Executor executor;public SingleFlight() {this(Runnable::run); // 默认在当前线程执行(根据需求调整)}public SingleFlight(Executor executor) {this.executor = executor;}/*** 执行请求合并* @param key 请求的唯一标识* @param loader 实际加载数据的回调函数* @return CompletableFuture<T> 异步结果*/public CompletableFuture<T> doRequest(String key, Callable<T> loader) {// 尝试原子性地将新Future加入MapCompletableFuture<T> newFuture = new CompletableFuture<>();CompletableFuture<T> existingFuture = ongoingRequests.putIfAbsent(key, newFuture);// 如果已有其他线程发起请求,直接等待现有结果if (existingFuture != null) {return existingFuture;}// 异步执行加载任务CompletableFuture.runAsync(() -> {try {T result = loader.call();newFuture.complete(result); // 成功完成,通知所有等待线程} catch (Exception e) {newFuture.completeExceptionally(e); // 传播异常} finally {ongoingRequests.remove(key); // 无论成功与否,移除key}}, executor);return newFuture;}
}
核心逻辑说明
-
请求合并
使用ConcurrentHashMap
跟踪正在处理的请求。当多个线程请求同一key
时,putIfAbsent
确保只有第一个线程创建新CompletableFuture
,后续线程直接等待已有结果。 -
异步加载
通过runAsync
在指定线程池执行加载逻辑,避免阻塞主线程。加载完成后,通过complete
或completeExceptionally
通知所有等待的线程。 -
资源清理
finally
块中移除Map中的key
,确保后续请求能触发新加载。即使加载失败,异常也会传递给所有等待线程,防止无限等待。
使用示例
public class CacheExample {private final SingleFlight<String> singleFlight = new SingleFlight<>();public String getData(String key) throws Exception {// 尝试从缓存读取String cachedData = getFromCache(key);if (cachedData != null) {return cachedData;}// 缓存未命中,使用SingleFlight防止击穿CompletableFuture<String> future = singleFlight.doRequest(key, () -> {// 实际数据加载逻辑(如查数据库)return fetchFromDatabase(key);});return future.get(); // 阻塞等待结果(或异步处理)}private String getFromCache(String key) {// 模拟缓存未命中return null;}private String fetchFromDatabase(String key) {// 模拟数据库查询return "Data for " + key;}
}
方案对比
特性 | Go SingleFlight | Java本方案 |
---|---|---|
并发控制 | 使用sync.Mutex 和map | 基于ConcurrentHashMap 的无锁操作 |
异步处理 | 同步调用,通过chan 等待结果 | 利用CompletableFuture 实现异步响应 |
异常处理 | 返回error | 通过completeExceptionally 传递异常 |
资源释放 | 请求完成后删除call | finally 块中移除key |
线程模型 | 依赖Goroutine调度 | 可配置Executor 灵活控制线程池 |
注意事项
- 线程池选择:根据业务场景选择合适线程池,避免资源耗尽。
- Key设计:确保
key
能唯一标识请求,防止不同请求被错误合并。 - 超时机制:可结合
orTimeout
方法添加超时控制,避免长时间阻塞。 - 结果缓存:成功加载后更新缓存,提升后续请求效率。
该方案有效应对缓存击穿,确保高并发下同一资源仅加载一次,显著降低下游压力。