Skip to main content

Resilience4j Bulkhead, RateLimiter, TimeLimiter 限流

Bulkhead 是 Resilience4j 提供的限流器,它可以限制并发请求的数量,避免服务被过多请求拖垮。Bulkhead 有两种模式:Semaphore 和 ThreadPool。Semaphore 是通过信号量来限制并发请求的数量,ThreadPool 是通过线程池来限制并发请求的数量。这样可以防止由于某一个服务的响应时间过长,导致线程池被占满,从而影响其他服务的正常运行。Bulkhead 一词原意即船舱隔板,用来防止船舱内的水泄漏到其他船舱。

下面所有的详细配置可查询 Resilience4j 官方文档 Core Module 部分

Bulkhead 原理

Semaphore 模式中,会有一个信号量来限制并发请求的数量,当请求到达时,会先尝试获取信号量,如果获取成功,则继续执行,否则直接返回失败。类比于锁,Semaphore 是一种共享锁,可以被多个线程同时获取,但是数量有限。

ThreadPool 模式中,会有一个线程池来限制并发请求的数量,当请求到达时,会先尝试提交到线程池,如果线程池已满,则直接返回失败。

Bulkhead 信号量模式使用

先引入依赖:

implementation 'io.github.resilience4j:resilience4j-bulkhead:2.2.0'

然后配置即可,当然也可以使用 bean 的方法。

resilience4j:
bulkhead:
configs:
default:
max-concurrent-calls: 2
max-wait-duration: 1s
instances:
bh:
base-config: default

这里的设置和之前的 CircuitBreaker 类似,max-concurrent-calls 是最大并发请求数,max-wait-duration 是最大等待时间。

现在,我们为 API 添加一个 Bulkhead 限流器:

@GetMapping
@CircuitBreaker(name = "bk", fallbackMethod = "fb")
@Bulkhead(name = "bh", fallbackMethod = "fb")
public ResponseWrapper<List<PaymentRecord>> getAllPayments() {
return paymentAPIIf.getAllPayments();
}

这也和之前的 CircuitBreaker 类似,只是多了一个 @Bulkhead 注解。当并发请求数超过 2 时,会直接返回失败。

为了测试,我们手动添加一个延时:

@GetMapping
@CircuitBreaker(name = "bk", fallbackMethod = "fb")
@Bulkhead(name = "bh", fallbackMethod = "fb")
public ResponseWrapper<List<PaymentRecord>> getAllPayments() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return paymentAPIIf.getAllPayments();
}

然后,我们用浏览器打开多个网页,同时访问这个 API,可以看到,当并发请求数超过 2 时,会直接返回失败。

Bulkhead 线程池模式使用

与信号量模式不同,此时需要 API 返回一个 CompletableFuture 对象,然后在 CompletableFuture 对象中执行业务逻辑。这样,业务逻辑会在一个线程池中执行,从而限制并发请求数。

配置时,需要指定线程池的大小:

resilience4j:
bulkhead:
configs:
default:
max-concurrent-calls: 2
max-wait-duration: 1s
instances:
bh:
base-config: default
thread-pool-bulkhead:
max-thread-pool-size: 2

然后,我们修改 API 为线程池模式,并指定 Bulkhead 为线程池模式:

@GetMapping
@CircuitBreaker(name = "bk", fallbackMethod = "fb")
@Bulkhead(name = "bh", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fb")
public CompletableFuture<ResponseWrapper<List<PaymentRecord>>> getAllPayments() {
return CompletableFuture.supplyAsync(() -> paymentAPIIf.getAllPayments());
}

此时 fallback 方法的返回值也需要修改为 CompletableFuture 对象。

public CompletableFuture<ResponseWrapper<List<PaymentRecord>>> fb(Throwable t) {
return CompletableFuture.completedFuture(ResponseWrapper.error());
}

之后的效果和前文一致。

RateLimiter 原理

RateLimiter 是 Resilience4j 提供的限流器,它可以限制请求的速率,避免服务被过多请求拖垮。

整体上讲,常用的限流算法有四种。

  • 漏桶算法,固定容量的漏桶,请求先进入漏桶,然后以固定速率流出,如果漏桶已满,则直接返回失败。
  • 令牌桶算法,固定容量的令牌桶,请求先获取令牌,然后执行,如果令牌桶为空,则直接返回失败。令牌的数目会按固定速率增加。这是 Spring Cloud 的默认限流算法。
  • 滚动窗口算法,固定窗口内的请求数量不能超过限制。即每个不相互重叠的窗口内的请求数量不能超过限制。
  • 滑动窗口算法,固定窗口内的平均请求数量不能超过限制。即每个大小固定的窗口内的平均请求数量不能超过限制。

RateLimiter 使用

首先,添加依赖,

implementation 'io.github.resilience4j:resilience4j-ratelimiter:2.2.0'

然后添加配置,

  ratelimiter:
configs:
default:
timeout-duration: 2s
limit-for-period: 1
instances:
rl:
base-config: default

然后在 API 上添加 RateLimiter 注解,

@GetMapping
@CircuitBreaker(name = "bk", fallbackMethod = "fb")
@RateLimiter(name = "rl", fallbackMethod = "fb")
public ResponseWrapper<List<PaymentRecord>> getAllPayments() {
return paymentAPIIf.getAllPayments();
}

这样,当请求速率超过 2s 一个请求时,会直接返回失败。

TimeLimiter 原理与使用

TimeLimiter 是 Resilience4j 提供的限时器,它可以限制请求的执行时间,避免服务被过多请求拖垮。这是最简单的限流器,只需要设置一个超时时间即可。

首先,添加依赖,

implementation 'io.github.resilience4j:resilience4j-timelimiter:2.2.0'

然后添加配置,

  timelimiter:
configs:
default:
timeout-duration: 2s
instances:
tl:
base-config: default

然后在 API 上添加 TimeLimiter 注解,

@GetMapping
@CircuitBreaker(name = "bk", fallbackMethod = "fb")
@TimeLimiter(name = "tl", fallbackMethod = "fb")
public ResponseWrapper<List<PaymentRecord>> getAllPayments() {
return paymentAPIIf.getAllPayments();
}

这样,当请求执行时间超过 2s 时,会直接返回失败。