Limiting requests per second with WebClient

1. Introduction

In this tutorial, we will look at different ways to limit the number of requests per second with the Spring 5 web client.

While we usually want to take advantage of its non-blocking nature, some scenarios may force us to add a delay. We will learn about some of these scenarios when using some of the Project Reactor features to control a stream of requests to a server.

2. Initial Setup

A common case where we need to limit our requests per second is to avoid overwhelming the server. Also, some web services have a maximum number of requests allowed per hour. Similarly, some control the number of concurrent requests per client.

2.1. writing a simple web service

To explore this scenario, we’ll start with a simple @RestController which provides random numbers from a certain range:

@RestController
@RequestMapping("/random")
public class RandomController {
    @GetMapping
    Integer getRandom() {
        return new Random().nextInt(50));
    }
}

Next, we will simulate an expensive operation and limit the number of concurrent requests.

2.2. Rate limiting our servers

Before looking at the solution, let’s change our service to simulate a more realistic scenario.

First, we’ll limit the number of concurrent requests our server can handle, throwing an exception when the limit is reached.

Secondly, we will add a delay in processing our response, simulating an expensive operation. While there are more robust solutions available, we will only do this for illustrative purposes:

public class Concurrency {
    public static final int MAX_CONCURRENT = 5;
    static final AtomicInteger CONCURRENT_REQUESTS = new HashMap<>();
    public static int protect(IntSupplier supplier) {
        try {
            if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
                throw new UnsupportedOperationException("max concurrent requests reached");
            }
            TimeUnit.SECONDS.sleep(2);
            return supplier.getAsInt();
        } finally {
            CONCURRENT_REQUESTS.decrementAndGet();
        }
    }
}

Finally, let’s change our endpoint to use this:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

Now, our endpoint refuses to process requests when we have finished MAX_CONCURRENT The request is returning an error to the client.

2.3. write a simple client

All examples a. Will follow this pattern to generate flux of n request and make a Received Request for our service:

Flux.range(1, n)
  .flatMap(i -> {
    // GET request
  });

To reduce boilerplate, let’s implement the request part in a method that we can reuse across all examples. we will receive web clientCall Received(), And retrieve() response body with generics Parameterized Type Reference,

public interface RandomConsumer {
    static <T> Mono<T> get(WebClient client) {
        return client.get()
          .retrieve()
          .bodyToMono(new ParameterizedTypeReference<T>() {});
    }
}

Now we’re ready to see some perspectives.

3. Delay with zipWith(flux.interval())

Our first example associates our requests with a fixed delay zipWith(),

public class ZipWithInterval {
    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .zipWith(Flux.interval(Duration.ofMillis(delay)))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

As a result, it delays each request delay milliseconds. Us Note that this delay is applied before the request is sent.

4. With Delay Flux.delayElements()

flux There is a more simple way to delay its elements:

public class DelayElements {
    public static Flux<Integer> fetch(
      WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
          .delayElements(Duration.ofMillis(delay))
          .flatMap(i -> RandomConsumer.get(client));
    }
}

with delay element()delay applies directly to subscriber.onnext() Signal. In other words, it delays from each element flux.range(), Therefore, the function passed flatMap() Will be affected, it will take longer to start. For example, if delay value is 1000There will be a delay of one second before our request starts.

4.1. adopt our solution

As a result, if we do not provide enough delay, we will get an error:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;
    int requests = 10;
    assertThrows(InternalServerError.class, () -> {
      DelayElements.fetch(client, requests, delay)
        .blockLast();
    });
}

it’s because we’re waiting 100 Milliseconds per request, but each request takes two seconds to complete on the server side. So, increasingly, our concurrent request limit has been reached, and we get a 500 Mistake.

If we add enough delay we can get away with the request limit. But then, we’ll have another problem – we’ll be waiting longer than necessary.

Depending on our use case, waiting too long can affect performance significantly. So, next, let’s examine a more appropriate way to handle this as we know the limitations of our server.

5. Concurrency Control with flatMap()

Given the limitations of our service, our best option is to send as many as concurrent.MAX_CONCURRENT requests in parallel. To do this, we can add another argument to flatMap() For maximum number of parallel processing:

public class LimitConcurrency {
    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency) {
        return Flux.range(1, requests)
          .flatMap(i -> RandomConsumer.get(client), concurrency);
    }
}

This parameter guarantees not to exceed the maximum number of concurrent requests concurrency And that our processing will not be delayed more than necessary:

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;
    int requests = 10;
    assertDoesNotThrow(() -> {
      LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
        .blockLast();
    });
}

Nevertheless, it is worth discussing some other options depending on our scenario and preference. Let’s go over some of them.

6. Resilience4j. to use rate limiter

Resilience4j is a versatile library designed to deal with fault tolerance in applications. We will use this to limit the number of concurrent requests within an interval and include a timeout.

Let’s start by adding the dependency4j-reactor and resilience4j-ratelimiter dependencies:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

Then we create our rate limiter rateLimiter.of()Providing a name, an interval for sending new requests, a concurrency limit, and a timeout:

public class Resilience4jRateLimit {
    public static Flux<Integer> fetch(
      WebClient client, int requests, int concurrency, int interval) {
        RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
          .limitRefreshPeriod(Duration.ofMillis(interval))
          .limitForPeriod(concurrency)
          .timeoutDuration(Duration.ofMillis(interval * concurrency))
          .build());
        // ...
    }
}

Now we add it to our flux with TransformDeferred(), so it controls us Received Request Rate:

return Flux.range(1, requests)
  .flatMap(i -> RandomConsumer.get(client)
    .transformDeferred(RateLimiterOperator.of(limiter))
  );

We should note that we can still have problems if we define our interval as too short. But, this approach is helpful if we need to share rate limiter specification with other operations.

7. Precise Throttle with Guava

Guava has a general-purpose rate limiter that works well for our scenario. Furthermore, since this token uses the bucket algorithm, it will block only when necessary instead of every time, vice versa. Flux.delayElements(),

First, we need to add Guava to our pom.xml:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

To use it, we call ratelimiter.create() And send it the maximum number of requests per second we want to send. then, we say acquire() Feather limiter Before sending our request to throttle execution if necessary:

public class GuavaRateLimit {
    public static Flux<Integer> fetch(
      WebClient client, int requests, int requestsPerSecond) {
        RateLimiter limiter = RateLimiter.create(requestsPerSecond);
        return Flux.range(1, requests)
          .flatMap(i -> {
            limiter.acquire();
            return RandomConsumer.get(client);
          });
    }
}

This solution works excellently because of its simplicity – it doesn’t make our code longer than necessary. For example, if, for some reason, a request takes longer than expected, the next request will not wait to be executed. But, this is the case only if we are within the limits set for requests per second,

8. Conclusion

In this article, we looked at some of the available approaches to limit your rate. web client, After that, we simulated a controlled web service to see how it affected our code and tests. In addition, we used Project Reactor and some libraries to help us achieve the same goal differently.

And as always, the source code is available on GitHub.

       

Leave a Comment