扫码关注公众号:芋道源码

发送: 百事可乐
获取永久解锁本站全部文章的链接

《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Eureka/rate-limiter/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Eureka 1.8.X 版本


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 RateLimiter 的代码实现和 RateLimiter 在 Eureka 中的应用

推荐 Spring Cloud 书籍

2. RateLimiter

com.netflix.discovery.util.RateLimiter ,基于Token Bucket Algorithm ( 令牌桶算法 )的速率限制器。

FROM 《接口限流实践》
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

RateLimiter 目前支持分钟级秒级两种速率限制。构造方法如下:

public class RateLimiter {

/**
* 速率单位转换成毫秒
*/
private final long rateToMsConversion;

public RateLimiter(TimeUnit averageRateUnit) {
switch (averageRateUnit) {
case SECONDS: // 秒级
rateToMsConversion = 1000;
break;
case MINUTES: // 分钟级
rateToMsConversion = 60 * 1000;
break;
default:
throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
}
}
}
  • averageRateUnit 参数,速率单位。构造方法里将 averageRateUnit 转换成 rateToMsConversion

调用 #acquire(...) 方法,获取令牌,并返回是否获取成功

// RateLimiter.java
/**
* 获取令牌( Token )
*
* @param burstSize 令牌桶上限
* @param averageRate 令牌再装平均速率
* @return 是否获取成功
*/
public boolean acquire(int burstSize, long averageRate) {
return acquire(burstSize, averageRate, System.currentTimeMillis());
}

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
return true;
}

// 填充 令牌
refillToken(burstSize, averageRate, currentTimeMillis);
// 消费 令牌
return consumeToken(burstSize);
}
  • burstSize 参数 :令牌桶上限。
  • averageRate 参数 :令牌填充平均速率。
  • 我们举个 🌰 来理解这两个参数 + 构造方法里的一个参数:
    • averageRateUnit = SECONDS
    • averageRate = 2000
    • burstSize = 10
    • 可获取 2000 个令牌。例如,每秒允许请求 2000 次。
    • 毫秒可填充 2000 / 1000 = 2消耗的令牌。
    • 毫秒可获取 10 个令牌。例如,每毫秒允许请求上限为 10 次,并且请求消耗掉的令牌,需要逐步填充。这里要注意下,虽然每毫秒允许请求上限为 10 次,这是在没有任何令牌被消耗的情况下,实际每秒允许请求依然是 2000 次。
    • 这就是基于令牌桶算法的限流的特点:让流量平稳,而不是瞬间流量。1000 QPS 相对平均的分摊在这一秒内,而不是第 1 ms 999 请求,后面 999 ms 0 请求
  • 从代码上看,#acquire(...) 分成两部分,我们分别解析,整体如下图:

2.1 refillToken

调用 #refillToken(...) 方法,填充已消耗的令牌。可能很多同学开始和我想的一样,一个后台每毫秒执行填充。为什么不适合这样呢?一方面,实际项目里每个接口都会有相应的 RateLimiter ,导致太多执行频率极高的后台任务;另一方面,获取令牌时才计算,多次令牌填充可以合并成一次,减少冗余和无效的计算。

代码如下:

 1: /**
2: * 速率单位转换成毫秒
3: */
4: private final long rateToMsConversion;
5:
6: /**
7: * 消耗令牌数
8: */
9: private final AtomicInteger consumedTokens = new AtomicInteger();
10: /**
11: * 最后填充令牌的时间
12: */
13: private final AtomicLong lastRefillTime = new AtomicLong(0);
14:
15: private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
16: // 获得 最后填充令牌的时间
17: long refillTime = lastRefillTime.get();
18: // 获得 过去多少毫秒
19: long timeDelta = currentTimeMillis - refillTime;
20:
21: // 计算 可填充最大令牌数量
22: long newTokens = timeDelta * averageRate / rateToMsConversion;
23: if (newTokens > 0) {
24: // 计算 新的填充令牌的时间
25: long newRefillTime = refillTime == 0
26: ? currentTimeMillis
27: : refillTime + newTokens * rateToMsConversion / averageRate;
28: // CAS 保证有且仅有一个线程进入填充
29: if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
30: while (true) { // 死循环,直到成功
31: // 计算 填充令牌后的已消耗令牌数量
32: int currentLevel = consumedTokens.get();
33: int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
34: int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
35: // CAS 避免和正在消费令牌的线程冲突
36: if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
37: return;
38: }
39: }
40: }
41: }
42: }
  • 第 17 行 :获取最后填充令牌的时间( refillTime ) 。每次填充令牌,会设置 currentTimeMillisrefillTime
  • 第 19 行 :获得距离最后填充令牌的时间差( timeDelta ),用于计算需要填充的令牌数。
  • 第 22 行 :计算可填充的最大令牌数量( newTokens )。newTokens 可能超过 burstSize ,所以下面会有逻辑调整 newTokens
  • 第 25 至 27 行 :计算新的填充令牌的时间。为什么不能用 currentTimeMillis?例如,averageRate = 500 && averageRateUnit = SECONDS 时, 每 2 毫秒才填充一个令牌,如果设置 currentTimeMillis会导致不足以填充一个令牌的时长被吞了
  • 第 29 行 :通过 CAS 保证有且仅有一个线程进入填充逻辑。
  • 第 30 行 :死循环直到成功
  • 第 32 至 34 行 :计算新的填充令牌后的已消耗的令牌数量。
    • 第 33 行 :burstSize 可能调小,例如,系统接入分布式配置中心,可以远程调整该数值。如果此时 burstSize 更小,以它作为已消耗的令牌数量。
  • 第 36 行 :通过 CAS 保证避免覆盖设置正在消费令牌的线程。

2.2 consumeToken

#refillToken(...) 方法,填充消耗( 获取 )的令牌。

代码如下 :

 1: private boolean consumeToken(int burstSize) {
2: while (true) { // 死循环,直到没有令牌,或者获取令牌成功
3: // 没有令牌
4: int currentLevel = consumedTokens.get();
5: if (currentLevel >= burstSize) {
6: return false;
7: }
8: // CAS 避免和正在消费令牌或者填充令牌的线程冲突
9: if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
10: return true;
11: }
12: }
13: }
  • 第 2 行 :死循环直到没有令牌或者竞争获取令牌成功
  • 第 4 至 7 行 :没有令牌。
  • 第 9 至 11 行 :通过 CAS 避免和正在消费令牌或者填充令牌的线程冲突。

3. RateLimitingFilter

com.netflix.eureka.RateLimitingFilter ,Eureka-Server 限流过滤器。使用 RateLimiting ,保证 Eureka-Server 稳定性。

#doFilter(...) 方法,代码如下:

 1: @Override
2: public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
3: // 获得 Target
4: Target target = getTarget(request);
5:
6: // Other Target ,不做限流
7: if (target == Target.Other) {
8: chain.doFilter(request, response);
9: return;
10: }
11:
12: HttpServletRequest httpRequest = (HttpServletRequest) request;
13: // 判断是否被限流
14: if (isRateLimited(httpRequest, target)) {
15: // TODO[0012]:监控相关,跳过
16: incrementStats(target);
17: // 如果开启限流,返回 503 状态码
18: if (serverConfig.isRateLimiterEnabled()) {
19: ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
20: return;
21: }
22: }
23: chain.doFilter(request, response);
24: }
  • 第 4 行 :调用 #getTarget() 方法,获取 Target。RateLimitingFilter 只对符合正在表达式 ^.*/apps(/[^/]*)?$ 的接口做限流,其中不包含 Eureka-Server 集群批量同步接口。
    • 点击 链接 查看 Target 枚举类代码。
    • 点击 链接 查看 #getTarget(...) 方法代码。
  • 第 14 行 :调用 #isRateLimited(...) 方法,判断是否被限流。代码如下:

     1: private boolean isRateLimited(HttpServletRequest request, Target target) {
    2: // 判断是否特权应用
    3: if (isPrivileged(request)) {
    4: logger.debug("Privileged {} request", target);
    5: return false;
    6: }
    7: // 判断是否被超载( 限流 )
    8: if (isOverloaded(target)) {
    9: logger.debug("Overloaded {} request; discarding it", target);
    10: return true;
    11: }
    12: logger.debug("{} request admitted", target);
    13: return false;
    14: }
    • 第 3 至 6 行 :调用 #isPrivileged() 方法,判断是否为特权应用,对特权应用不开启限流逻辑。代码如下:

      private boolean isPrivileged(HttpServletRequest request) {
      // 是否对标准客户端开启限流
      if (serverConfig.isRateLimiterThrottleStandardClients()) {
      return false;
      }
      // 以请求头( "DiscoveryIdentity-Name" ) 判断是否在标准客户端名集合内
      Set<String> privilegedClients = serverConfig.getRateLimiterPrivilegedClients();
      String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY);
      return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName);
      }
      • x
    • 第 8 至 11 行 :调用 #isOverloaded(...) 方法,判断是否超载( 限流 )。代码如下:

      /**
      * Includes both full and delta fetches.
      */
      private static final RateLimiter registryFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS);

      /**
      * Only full registry fetches.
      */
      private static final RateLimiter registryFullFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS);

      private boolean isOverloaded(Target target) {
      int maxInWindow = serverConfig.getRateLimiterBurstSize(); // 10
      int fetchWindowSize = serverConfig.getRateLimiterRegistryFetchAverageRate(); // 500
      boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize);
      if (target == Target.FullFetch) {
      int fullFetchWindowSize = serverConfig.getRateLimiterFullFetchAverageRate(); // 100
      overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize);
      }
      return overloaded;
      }
      • x
  • 第 18 至 21 行 :若 eureka.rateLimiter.enabled = true( 默认值 :false ,可配 ),返回 503 状态码。

4. InstanceInfoReplicator

com.netflix.discovery.InstanceInfoReplicator ,Eureka-Client 应用实例复制器。在 《Eureka 源码解析 —— 应用实例注册发现(一)之注册》「2.1 应用实例信息复制器」 有详细解析。

应用实例状态发生变化时,调用 #onDemandUpdate() 方法,向 Eureka-Server 发起注册,同步应用实例信息。InstanceInfoReplicator 使用 RateLimiter ,避免状态频繁发生变化,向 Eureka-Server 频繁同步。代码如下:

class InstanceInfoReplicator implements Runnable {

/**
* RateLimiter
*/
private final RateLimiter rateLimiter;
/**
* 令牌桶上限,默认:2
*/
private final int burstSize;
/**
* 令牌再装平均速率,默认:60 * 2 / 30 = 4
*/
private final int allowedRatePerMinute;

InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
// ... 省略其他代码

this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;

this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}

public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// 取消任务
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 再次调用
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}

}

666. 彩蛋

知识星球

后面找时间研究下 Google Guava RateLimiter 的源码实现,从功能上更加强大,感兴趣的胖友可以瞅瞅呀。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. RateLimiter
    1. 2.1. 2.1 refillToken
    2. 2.2. 2.2 consumeToken
  3. 3. 3. RateLimitingFilter
  4. 4. 4. InstanceInfoReplicator
  5. 5. 666. 彩蛋