【深入理解SpringCloud微服务】手写实现各种限流算法——固定时间窗、滑动时间窗、令牌桶算法、漏桶算法
- 限流算法
- 固定时间窗
- 滑动时间窗
- 令牌桶算法
- 漏桶算法
我们在上一篇文章手写实现了一个熔断限流微服务框架,但是限流算法和断路器的具体实现没有进行详细分析,今天是对上一篇文章的补充。当然由于本篇文章专注于限流和断路器算法,因此没有看过上一篇文章也可以学习本篇文章。
限流算法
我们的限流器接口是FlowLimiter。
/*** @author huangjunyi* @date 2023/12/18 19:39* @desc*/
public interface FlowLimiter {boolean canPass(String entryPointSymbol);}
canPass(String entryPointSymbol)是限流器验证的方法,参数entryPointSymbol表示资源名称,可以认为是某个接口或方法的唯一标识。
固定时间窗
首先我们在抽象类AbstractTimeWindowFlowLimiter中定义限流阈值字段,下面的滑动时间窗也可以用到。
public abstract class AbstractTimeWindowFlowLimiter implements FlowLimiter {// 默认限流阈值public static final int DEFAULT_LIMIT = 10;// 限流阈值protected int limit;}
然后定义简单时间窗限流器SimpleTimeWindowFlowLimiter,继承AbstractTimeWindowFlowLimiter。然后在SimpleTimeWindowFlowLimiter的canPass方法中实现固定时间窗算法。
/*** 简单时间窗限流器* @author huangjunyi* @date 2023/12/20 9:16* @desc*/
public class SimpleTimeWindowFlowLimiter extends AbstractTimeWindowFlowLimiter {// 记录每个接口方法对应的最后一次调用时间的时间戳// key是接口方法的唯一标识,也就是namepublic final ConcurrentHashMap<String, Long> lastTimeMap = new ConcurrentHashMap<>();// 保存每个接口方法对应的计数器// key是接口方法的唯一标识,也就是namepublic final ConcurrentHashMap<String, AtomicLong> concurrencyMap = new ConcurrentHashMap<>();...@Overridepublic boolean canPass(String name) {long currentTimeMillis = System.currentTimeMillis();if (!lastTimeMap.containsKey(name)) {lastTimeMap.put(name, currentTimeMillis);}if (!concurrencyMap.containsKey(name)) {concurrencyMap.put(name, new AtomicLong());}// 拿到上一次的调用时间Long lastTime = lastTimeMap.get(name);// 更新最后一次调用时间lastTimeMap.put(name, currentTimeMillis);// 拿到对应的计数器AtomicLong concurrency = concurrencyMap.get(name);// 判断本地调用时间距离上一次调用时间是否小于1秒(默认1秒一个时间窗)if (currentTimeMillis - lastTime <= 1000L) {// 判断是否达到限流阈值if (concurrency.get() >= this.limit) {return false;} else {// 正常通过,增加计数concurrency.incrementAndGet();return true;}}// 上一次调用与本次调用不在同一时间窗,重新创建计数器,返回trueconcurrencyMap.put(name, new AtomicLong());return true;}}
由于固定时间窗算法比较简单,就不过多分析了。
滑动时间窗
然后下面是滑动时间窗算法,滑动时间窗算法的实现类是SlideTimeWindowFlowLimiter。
我们默认把1秒分成10个时间窗格,然后每个时间窗格对应一个AtomicLong计数器。
/*** 滑动时间窗限流器* @author huangjunyi* @date 2023/12/20 19:52* @desc*/
public class SlideTimeWindowFlowLimiter extends AbstractTimeWindowFlowLimiter {// 每个时间窗格对应的请求数private AtomicLong[] windows = new AtomicLong[10];// 1秒分成10个时间窗格private long[] times = new long[10];}
@Overridepublic boolean canPass(String entryPointSymbol) {// 获取当前时间戳long currentTimeMillis = System.currentTimeMillis();// 1秒10个窗格,那么每个窗格100毫秒// 因此这里时间除以100,得到currentTime// 这个currentTime表示,从1970年1月1日零点算起,现在是第几个窗格long currentTime = currentTimeMillis / 100;// sum()方法统计目前的并发数,如果达到限流阈值,返回falseif (sum() >= this.limit) {return false;}// 我们的窗格数组长度是10,因此currentTime模10就能得出窗格下标// 然后判断窗格对应的计数器是否未初始化,或者该窗格是否是一个过期窗格// 如果计数器未初始化或窗格过期,那么进行初始化(或覆盖)if (windows[(int) (currentTime % 10)] == null || currentTime != times[(int) (currentTime % 10)]) {times[(int) (currentTime % 10)] = currentTime;windows[(int) (currentTime % 10)] = new AtomicLong(1);return true;}// 增加窗格对应计数器的计数windows[(int) (currentTime % 10)].incrementAndGet();return true;}
先讲解一下如何通过当前系统时间戳System.currentTimeMillis()定位时间窗格。思路是这样:
- System.currentTimeMillis()返回的是从1970年1月1日零点开始算起的毫秒值,而由于我们的每个格子又是100毫秒,那么 System.currentTimeMillis() / 100 就能得到从1970年1月1日零点开始算起第几个窗格,这个数记录到currentTime。
- 由于我们的时间窗格数组长度为10,那么currentTime模10,就能得出当前时间对应的时间窗格数组下标
举个例子:
比如当前时间是2024-04-05 13:47:10,那么时间戳是1712296030000,currentTime是17122960300,得到的窗格数组下标是0。
假如时间走了95毫秒,时间戳是1712296030095,currentTime是17122960300,得到的窗格数组下标还是0。
假如时间又走了5毫秒时间戳是1712296030100,currentTime是17122960301,得到的窗格数组下标是1。
可以看出,计算结果是符合每个窗格100毫秒的滑动时间窗的设定。
然后看看sum()方法是如何统计当前并发数的。
private long sum() {long currentTimeMillis = System.currentTimeMillis();// 按照同样的方法算出currentTimelong currentTime = currentTimeMillis / 100;long sum = 0L;for (int i = 0; i < times.length; i++) {// 已过期的窗口不统计if (currentTime - times[i] >= 10) {continue;}AtomicLong window = windows[i];if (window != null) {// 时间窗格对应的计数器的值累加到结果sum中sum += window.get();}}return sum;}
sum()方法首先按照同样的方式算出当前的currentTime,然后由于times数组中每个元素记录的都是自己对应的currentTime,那么我们只要拿着当前的currentTime,减去时间窗格记录的currentTime,发现如果大于10,那么代表该窗格以过期了(也就1秒以前的),那么该窗格对应的并发数不统计。
然后把没过期的时间窗格对应的AtomicLong值累加到sum变量,就得到当前并发数。
令牌桶算法
简单的令牌桶算法就是起一个后台线程定时加令牌,然后取的时候直接get就行。
但是这种方式是有点low的,我们实现的令牌桶算法并不是这样子,而是根据时间差模拟线程加令牌,那就不需要额外起线程了。
令牌桶算法的实现类是TokenBucketFlowLimiter。
/*** 令牌桶限流器* @author huangjunyi* @date 2023/12/21 20:22* @desc*/
public class TokenBucketFlowLimiter implements FlowLimiter {// 令牌桶容量private int capacity;// 当前令牌数private AtomicInteger total;// 令牌桶增加速率(每秒增加的个数)private int rate;// 上次获取令牌的时间private long lastTime;...@Overridepublic boolean canPass(String entryPointSymbol) {// 先执行加令牌while (true) {long currentTimeMillis = System.currentTimeMillis();// 当前时间距离上次时间,走过了多少秒int diffSeconds = (int) ((currentTimeMillis - lastTime) / 1000);// 小于1秒时间差,不加令牌if (diffSeconds < 1) {break;}int currentTokens = total.get();// 利用CAS的方式加令牌// 时间差diffSeconds(秒) * 令牌增加速率rate(每秒加几个),就是要加的令牌数// 用Math.min()方法控制不加超// 注意:这里有并发问题,total和lastTime非原子性,大家自行更改if (total.compareAndSet(currentTokens, Math.min(capacity, currentTokens + diffSeconds * rate))) {// 记录最后一次调用的时间lastTime = currentTimeMillis;break;}}// 再取令牌,取到的通过,取不到的拒绝boolean canpass;while (true) {int currentTokens = total.get();if (currentTokens == 0) {// 没令牌了,不通过canpass = false;break;}// 还有令牌,那么取走1个,total要通过CAS减1if (total.compareAndSet(currentTokens, currentTokens - 1)) {canpass = true;break;}}return canpass;}
}
首先是先根据时间差加令牌,也就是算出当前时间距离上次的时间走过了多少秒。我们加令牌的策略是按秒加的,通过一个rate属性记录每秒加几个,因此如果时间差小于1秒,就不加了。
加完令牌再取令牌。取令牌只要判断total是否大于0就行了,大于0代表还有令牌,否则就是没有令牌了。如果还有令牌,需要通过CAS减1,表示取走一个令牌。
漏桶算法
漏桶算法也有一个最简单的版本,那就是每个请求都放到一个队列中,然后有一个后台线程按照一定的速率取出队列中的请求并处理。
但是这种做法,也是太low了。我们实现的漏桶算法依然是以时间差的方式模拟漏桶漏水,这样就不需要后台线程了。
漏桶算法的实现类是LeakyBucketFlowLimiter。
/*** 漏桶限流器* @author huangjunyi* @date 2023/12/21 20:23* @desc*/
public class LeakyBucketFlowLimiter implements FlowLimiter {// 桶容量private int capacity;// 桶当前水位private AtomicInteger waterline;// 流出速率(每分钟)private int rate;// 上次获取令牌的时间private long lastTime;// 两次请求间的间隔时间(毫秒)private int diffTime;...@Overridepublic boolean canPass(String entryPointSymbol) {// 先执行漏水while (true) {long currentLastTime = this.lastTime;long currentTimeMillis = System.currentTimeMillis();// 根据时间差(本次请求距离上一次请求)算出漏水量// rate是漏桶出水速率,每分钟漏出几个// rate/60就是每秒中漏水量// 然后时间差(秒数) * (rate/60),就得到漏水量int outflow = (int) (((currentTimeMillis - currentLastTime) / 1000) * ((double) rate / 60));// 没得漏,breakif (outflow < 1) {break;}int oldWaterline = waterline.get();// 根据漏水量outflow,以CAS的方式更新桶中水量waterline// 用Math.max(0, ...)方法控制不会漏成负数// 注意:这里有并发问题,waterline和lastTime非原子性,大家自行更改if (waterline.compareAndSet(oldWaterline, Math.max(0, oldWaterline - outflow))) {// 记录最后一次调用的时间 this.lastTime = currentTimeMillis;break;}}// 再执行加水,能加成功则通过,加不成功则拒绝boolean canpass;while (true) {int currentWaterline = waterline.get();// 桶满了,则拒绝if (currentWaterline >= capacity) {canpass = false;break;}// 通过,CAS更新桶中水量waterlineif (waterline.compareAndSet(currentWaterline, currentWaterline + 1)) {canpass = true;break;}}// 根据当前桶中水量,算出自己要等待的时间// diffTime是设置的两次请求间规定的间隔时间(毫秒)if (canpass) {try {Thread.sleep(waterline.get() * diffTime);} catch (InterruptedException e) {return true;}}return canpass;}
}
漏桶算法跟令牌桶算法的区别有两点:
- 令牌桶是根据时间差做加法,漏桶是根据时间差做减法
- 由于漏桶是按照一定速率漏水的,因此请求的处理可能不会马上被执行,有一个等待的过程
我们的令牌桶算法,首先执行根据时间差做漏水处理。时间差是当前请求距离上次请求的秒数,然后算出每秒流出速率,这两值一相乘,就是本次要流出的水量。然后使用CAS更新桶中水量,并通过Math.max(0, …)方法控制不会漏成负数。
处理完漏水后,再执行加水。判断当前桶中水量是否大于等于桶容量,如果是,那么拒绝处理当前请求;否则桶中水量加1。
最后还要计算当前请求要等多久才被处理,因为漏桶是按一定速率漏水的,也就是按一定速率处理请求,因此漏桶算法不会像令牌桶算法那样取到令牌就可以立刻执行,漏桶算法需要根据漏水速率等待桶漏水漏到自己时才处理。
代码已经提交到gitee,可以自行下载阅读。
https://gitee.com/huang_junyi/simple-microservice/tree/master/simple-microservice-protector/src/main/java/com/huangjunyi1993/simple/microservice/protector/flow