Spring & JVM

Spring gateway redis-lua 令牌桶限流算法解读

1. 源码分析

-- 令牌桶在redis中的key值
local tokens_key = KEYS[1]
-- 该令牌桶上一次刷新的时间对应的key的值
local timestamp_key = KEYS[2]
-- 令牌单位时间填充速率
local rate = tonumber(ARGV[1])
-- 令牌桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间
local now = tonumber(ARGV[3])
-- 本次请求需要消耗的令牌数
local requested = tonumber(ARGV[4]) -- 默认 1
-- 令牌桶填满所需要的总时间 = 令牌桶容量 / 令牌填充速率
local fill_time = capacity/rate
-- 令牌过期时间  填充时间*2
local ttl = math.floor(fill_time*2)
-- 获取上一次令牌桶剩余的令牌数
local last_tokens = tonumber(redis.call("get", tokens_key))
-- 如果没有获取到,可能是令牌桶是新的,之前不存在该令牌桶、或该令牌桶已经好久没有使用过期了,这里需要对令牌桶进行初始化,初始情况令牌桶是满的
if last_tokens == nil then
  last_tokens = capacity
end
-- 获取上一次刷新的时间,如果没有或者已经过期,那么初始化为0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

-- [先计算当前要填充的令牌数]

-- 计算上一次刷新时间和本次刷新时间的时间差
local delta = math.max(0, now-last_refreshed)
-- delta * rate 为这个时间差需要填充的令牌数
-- 令牌桶本次填充完后的令牌数 = 令牌桶原有的令牌数 + 本次需填充的令牌数,最大不超过容量
local filled_tokens = math.min(capacity, last_tokens + (delta * rate))
-- 判断令牌桶中的令牌数是否满足本次请求所需的令牌数,若不满足则说明需要限流了
local allowed = filled_tokens >= requested

-- [再减去本次需要消耗的令牌数]

-- 这里声明了两个变量,一个是新的令牌数,一个是是否被限流,0 代表要限流,1 代表无需限流
local new_tokens = filled_tokens
local allowed_num = 0
-- 如果没有被限流(即 filled_tokens >= requested)则跳过
if allowed then
  -- 本次消耗后的新令牌数 = 刚计算好的令牌桶中存在的令牌数 - 本次需要使用的令牌数
  new_tokens = filled_tokens - requested
  -- 并设置状态无需限流
  allowed_num = 1
end

-- 存储本次消耗后的令牌数,以及本次刷新时间
if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end
-- 返回: 是否需限流标志  和  本次消耗后令牌桶剩余令牌数
return { allowed_num, new_tokens }
@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
    if (!this.initialized.get()) {
        throw new IllegalStateException("RedisRateLimiter is not initialized");
    }
    Config routeConfig = loadConfiguration(routeId);
    // 允许用户每秒处理多少个请求
    int replenishRate = routeConfig.getReplenishRate();
    // 令牌桶容量溢出则限流
    int burstCapacity = routeConfig.getBurstCapacity();
    // 本次请求需消耗多少令牌
    int requestedTokens = routeConfig.getRequestedTokens();
    try {
        List<String> keys = getKeys(id);
        // lua 的参数 time 以秒为单位返回 unixtime。
        List<String> scriptArgs = Arrays.asList(replenishRate + "",
                burstCapacity + "", Instant.now().getEpochSecond() + "",
                requestedTokens + "");
        // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
        Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
        return flux.onErrorResume(throwable -> {
            if (log.isDebugEnabled()) {
                log.debug("Error calling rate limiter lua", throwable);
            }
            return Flux.just(Arrays.asList(1L, -1L));
        }).reduce(new ArrayList<Long>(), (longs, l) -> {
            longs.addAll(l);
            return longs;
        }).map(results -> {
            boolean allowed = results.get(0) == 1L;
            Long tokensLeft = results.get(1);
            Response response = new Response(allowed,
                    getHeaders(routeConfig, tokensLeft));
            if (log.isDebugEnabled()) {
                log.debug("response: " + response);
            }
            return response;
        });
    }
    catch (Exception e) {
        /*
         * We don't want a hard dependency on Redis to allow traffic. Make sure to set
         * an alert so you know if this is happening too much. Stripe's observed
         * failure rate is 0.01%.
         */
        log.error("Error determining if user allowed from redis", e);
    }
    return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

2. 相关参考

3. FAQ

留言

您的电子邮箱地址不会被公开。