摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-request-rate-limiter/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Spring-Cloud-Gateway 2.0.X M4


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 RequestRateLimiterGatewayFilterFactory 的代码实现

《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.2) 之 GatewayFilterFactory 过滤器工厂》 一文中,我们看到 Spring Cloud Gateway 提供了多种 GatewayFilterFactory 的实现,而 RequestRateLimiterGatewayFilterFactory 也是其中的一种。

通过 RequestRateLimiterGatewayFilterFactory ,可以创建 RequestRateLimiterGatewayFilter ( 实际是内部匿名类,为了表述方便,下面继续这么称呼 ) 。

RequestRateLimiterGatewayFilter 使用 Redis + Lua 实现分布式限流。而限流的粒度,例如 URL / 用户 / IP 等,通过 org.springframework.cloud.gateway.filter.ratelimit.KeyResolver 实现类决定,在 「4. KeyResolver」 详细解析。

这里,笔者一本正经的推荐下自己分享的 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 ,简直业界良心。


推荐 Spring Cloud 书籍

推荐 Spring Cloud 视频

2. 环境搭建

第一步,以 spring-cloud-gateway-sample 项目为基础,在 pom.xml 文件添加依赖库。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

第二步,在 application.yml 配置一个 RouteDefinition 。

spring:
cloud:
gateway:
routes:
# =====================================
- id: default_path_to_httpbin
uri: http://127.0.0.1:8081
order: 10000
predicates:
- Path=
private final boolean allowed;
private final long tokensRemaining;
public Response(boolean allowed, long tokensRemaining) {
this.allowed = allowed;
this.tokensRemaining = tokensRemaining;
}
}

5.1 GatewayRedisAutoConfiguration

org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration ,Redis 相关配置类,代码如下 :

1: @Configuration
2: @AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
3: @AutoConfigureBefore(GatewayAutoConfiguration.class)
4: @ConditionalOnBean(ReactiveRedisTemplate.class)
5: @ConditionalOnClass({RedisTemplate.class, DispatcherHandler.class})
6: class GatewayRedisAutoConfiguration {
7:
8: @Bean
9: @SuppressWarnings("unchecked")
10: public RedisScript redisRequestRateLimiterScript() {
11: DefaultRedisScript redisScript = new DefaultRedisScript<>();
12: redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
13: redisScript.setResultType(List.class);
14: return redisScript;
15: }
16:
17: @Bean
18: //TODO: replace with ReactiveStringRedisTemplate in future
19: public ReactiveRedisTemplate<String, String> stringReactiveRedisTemplate(
20: ReactiveRedisConnectionFactory reactiveRedisConnectionFactory,
21: ResourceLoader resourceLoader) {
22: RedisSerializer<String> serializer = new StringRedisSerializer();
23: RedisSerializationContext<String , String> serializationContext = RedisSerializationContext
24: .<String, String>newSerializationContext()
25: .key(serializer)
26: .value(serializer)
27: .hashKey(serializer)
28: .hashValue(serializer)
29: .build();
30: return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory,
31: serializationContext);
32: }
33:
34: @Bean
35: public RedisRateLimiter redisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
36: @Qualifier("redisRequestRateLimiterScript") RedisScript<List<Long>> redisScript) {
37: return new RedisRateLimiter(redisTemplate, redisScript);
38: }
39:
40: }
  • 第 8 至 15 行 :创建 org.springframework.data.redis.core.script.RedisScript Bean 对象,加载 META-INF/scripts/request_rate_limiter.lua 路径下的 Redis Lua 脚本。该脚本使用 Redis 基于令牌桶算法实现限流。在本文 「Redis Lua 脚本」 详细解析。
  • 第 17 至 32 行 :创建 org.springframework.data.redis.core.ReactiveRedisTemplate Bean 对象。
  • 第 34 至 38 行 :使用 RedisScript 和 ReactiveRedisTemplate Bean 对象,创建 RedisRateLimiter Bean 对象。

5.2 RedisRateLimiter

org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter ,基于 Redis 的分布式限流器实现类

构造方法,代码如下 :

public class RedisRateLimiter implements RateLimiter {
public static final String REPLENISH_RATE_KEY = "replenishRate";
public static final String BURST_CAPACITY_KEY = "burstCapacity";
private final ReactiveRedisTemplate<String, String> redisTemplate;
private final RedisScript<List<Long>> script;
public RedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
RedisScript<List<Long>> script) {
this.redisTemplate = redisTemplate;
this.script = script;
}
}
  • redisTemplate 属性,RedisTemplate 。
  • script 属性,Lua 脚本。

#isAllowed(id, Tuple) ,代码如下 :

1: public Mono<Response> isAllowed(String id, Tuple args) {
2: // How many requests per second do you want a user to be allowed to do?
3: int replenishRate = args.getInt(REPLENISH_RATE_KEY);
4:
5: // How much bursting do you want to allow?
6: int burstCapacity;
7: if (args.hasFieldName(BURST_CAPACITY_KEY)) {
8: burstCapacity = args.getInt(BURST_CAPACITY_KEY);
9: } else {
10: burstCapacity = 0;
11: }
12:
13: try {
14: // Make a unique key per user.
15: String prefix = "request_rate_limiter." + id;
16:
17: // You need two Redis keys for Token Bucket.
18: List<String> keys = Arrays.asList(prefix + ".tokens", prefix + ".timestamp");
19:
20: // The arguments to the LUA script. time() returns unixtime in seconds.
21: List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
22: Instant.now().getEpochSecond() + "", "1");
23: // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
24: Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
25: // .log("redisratelimiter", Level.FINER);
26: return flux
27: // Throwable => Flux.just(Arrays.asList(1L, -1L)) 。
28: .onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
29: // Flux<List<Long>> => Mono<List<Long>>
30: .reduce(new ArrayList<Long>(), (longs, l) -> {
31: longs.addAll(l);
32: return longs;
33: })
34: // Mono<List<Long>> => Mono<Response>
35: .map(results -> {
36: boolean allowed = results.get(0) == 1L;
37: Long tokensLeft = results.get(1);
38:
39: Response response = new Response(allowed, tokensLeft);
40:
41: if (log.isDebugEnabled()) {
42: log.debug("response: " + response);
43: }
44: return response;
45: });
46: }
47: catch (Exception e) {
48:
53: log.error("Error determining if user allowed from redis", e);
54: }
55: return Mono.just(new Response(true, -1));
56: }
  • id 方法参数,令牌桶编号。一个令牌桶编号对应令牌桶。
    • 在本文场景中为请求限流
  • 第 3 行 :获得 burstCapacity 令牌桶上限。
  • 第 5 至 11 行 :获得 replenishRate ,令牌桶填充平均速率,单位:秒。
  • 第 15 行 :获得令牌桶前缀,request_rate_limiter.${id}
  • 第 18 行 :获得令牌桶键数组 :
    • request_rate_limiter.${id}.tokens :令牌桶剩余令牌数。
    • request_rate_limiter.${id}.timestamp :令牌桶最后填充令牌时间,单位:秒。
  • 第 21 至 22 行 :获得 Lua 脚本参数 :

    • 第一个参数 :replenishRate
    • 第二个参数 :burstCapacity
    • 第三个参数 :得到从 1970-01-01 00:00:00 开始的秒数。为什么在 Java 代码里获取,而不使用 Lua 在 Reids 里获取

      FROM 《亿级流量网站架构核心技术》
      因为 Redis 的限制( Lua中有写操作不能使用带随机性质的读操作,如TIME )不能在 Redis Lua中 使用 TIME 获取时间戳,因此只好从应用获取然后传入,在某些极端情况下(机器时钟不准的情况下),限流会存在一些小问题。

      • 涛哥这本书非常不错,推荐购买。
    • 第四个参数 :消耗令牌数量,默认 1 。

  • 第 24 行 :调用 ReactiveRedisTemplate#execute(RedisScript<T>, List<K>, List<?>) 方法,执行 Redis Lua 脚本,获取令牌。返回结果为 [是否获取令牌成功, 剩余令牌数] ,其中,1 代表获取令牌成功0 代表令牌获取失败

  • 第 25 行 :当 Redis Lua 脚本过程中发生异常,忽略异常,返回 Flux.just(Arrays.asList(1L, -1L)) ,即认为获取令牌成功。为什么?在 Redis 发生故障时,我们不希望限流器对 Reids 是强依赖,并且 Redis 发生故障的概率本身就很低。

    We don’t want a hard dependency on Redis to allow traffic.
    Make sure to set an alert so you know if this is happening too much. Stripe’s observed failure rate is 0.01%.

  • 第 30 至 33 行 :调用 Flux#reduce(A, BiFunction<A, ? super T, A>) 方法,将 Flux<List<Long>> 转换成 Mono<List<Long>> 。因为 ReactiveRedisTemplate#execute(RedisScript<T>, List<K>, List<?>) 方法的执行结果为 Flux ( 多次 ),实际在当前场景里,自行 Redis Lua 脚本只会返回一次数组,所以转换成 Mono (一次)。

  • 第 35 至 45 行 :调用 Mono#map(Function<? super T, ? extends R>) 方法,将 Mono<List<Long>> => Mono<Response>
  • 第 47 至 55 行 :当【第 15 至 24 行】代码部分执行发生异常时,例如 Redis 挂了,返回 Flux.just(Arrays.asList(1L, -1L)) ,即认为获取令牌成功

5.3 Redis Lua 脚本

META-INF/scripts/request_rate_limiter.lua ,Redis Lua 脚本,实现基于令牌桶算法实现限流。代码如下 :

1: local tokens_key = KEYS[1]
2: local timestamp_key = KEYS[2]
3:
4: local rate = tonumber(ARGV[1])
5: local capacity = tonumber(ARGV[2])
6: local now = tonumber(ARGV[3])
7: local requested = tonumber(ARGV[4])
8:
9: local fill_time = capacity/rate
10: local ttl = math.floor(fill_time*2)
11:
12: local last_tokens = tonumber(redis.call("get", tokens_key))
13: if last_tokens == nil then
14: last_tokens = capacity
15: end
16:
17: local last_refreshed = tonumber(redis.call("get", timestamp_key))
18: if last_refreshed == nil then
19: last_refreshed = 0
20: end
21:
22: local delta = math.max(0, now-last_refreshed)
23: local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
24: local allowed = filled_tokens >= requested
25: local new_tokens = filled_tokens
26: local allowed_num = 0
27: if allowed then
28: new_tokens = filled_tokens - requested
29: allowed_num = 1
30: end
31:
32: redis.call("setex", tokens_key, ttl, new_tokens)
33: redis.call("setex", timestamp_key, ttl, now)
34:
35: return { allowed_num, new_tokens }
  • 第 1 至 2 行 :KEYS 方法参数 :
    • 第一个参数 :request_rate_limiter.${id}.tokens ,令牌桶剩余令牌数。
    • 第二个参数 :request_rate_limiter.${id}.timestamp ,令牌桶最后填充令牌时间,单位:秒。
  • 第 4 至 7 行 :ARGV 方法参数 :

    • 第一个参数 :replenishRate
    • 第二个参数 :burstCapacity
    • 第三个参数 :得到从 1970-01-01 00:00:00 开始的秒数。
    • 第四个参数 :消耗令牌数量,默认 1 。
  • 第 9 行 :计算令牌桶填充令牌需要多久时间,单位:秒。

  • 第 10 行 :计算 request_rate_limiter.${id}.tokens / request_rate_limiter.${id}.timestampttl* 2 保证时间充足。
  • 第 12 至 20 行 :调用 get 命令,获得令牌桶剩余令牌数( last_tokens ) ,令牌桶最后填充令牌时间(last_refreshed) 。
  • 第 22 至 23 行 :填充令牌,计算的令牌桶剩余令牌数( filled_tokens )。填充不超过令牌桶令牌上限
  • 第 24 至 30 行 :获取令牌是否成功。

    • 成功,令牌桶剩余令牌数(new_tokens) 消耗令牌数( requested ),并设置获取成功( allowed_num = 1 ) 。
    • 失败,设置获取失败( allowed_num = 0 ) 。
  • 第 32 至 33 行 :设置令牌桶剩余令牌数( new_tokens ) ,令牌桶最后填充令牌时间(now) 。

  • 第 35 行 :返回数组结果,[是否获取令牌成功, 剩余令牌数]

Redis Lua 脚本不会有并发问题么

FROM 《亿级流量网站架构核心技术》
因 Redis 是单线程模型,因此是线程安全的。

666. 彩蛋

知识星球

哇哈哈,过滤器全部完成。恩,当然后面需要在考虑一下,例如认证过滤器等等。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. 环境搭建
  3. 3. 3. RequestRateLimiterGatewayFilterFactory
  4. 4. 4. KeyResolver
    1. 4.1. 4.1 PrincipalNameKeyResolver
    2. 4.2. 4.2 自定义 KeyResolver
  5. 5. 5. RateLimiter
    1. 5.1. 5.1 GatewayRedisAutoConfiguration
    2. 5.2. 5.2 RedisRateLimiter
    3. 5.3. 5.3 Redis Lua 脚本
  6. 6. 666. 彩蛋