how to access the first element of flux

1. Overview

In this tutorial, we are going to use a . will explore different ways of accessing the first element of flux Spring 5 with webflux.

First, we’ll use non-blocking methods of the API, such as next() And Take(), After that, we will see how to achieve the same thing with the help of elementAt() method, where we get index.

Finally, we’ll learn about the API’s blocking methods, and we’ll use blockfirst() to access the first element of flux,

2. Test Setup

For the code examples in this article, we will use payment Class, which has only one field, Payment price,

public class Payment {
    private final int amount;
    // constructor and getter
}

In tests, we’ll build a flux Payments made using the test helper method are called fluxoff threepayments,

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

After that, we will use Spring Reactor step validator to test the results.

3. next()

First, let’s try next() way. This method will return the first element of the flux wrapped in reactive mono type:

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();
    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

On the other hand, if we would call next() on empty fluxresult will be one empty mono. As a result, blocking it will return Zero,

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();
    Mono<Payment> firstPayment = emptyFlux.next();
    StepVerifier.create(firstPayment)
      .verifyComplete();
}

4. Take()

Take() The method of a reactive flow is equivalent to range () For Java 8 Streams. In other words, we can use take(1) To limit flux for exactly one element and then use it in a blocking or non-blocking way:

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);
    StepVerifier.create(firstPaymentFlux)
      .expectNext(paymentOf100)
      .verifyComplete();
}

Similarly, for an empty flow, take(1) will return an empty flow:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();
    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);
    StepVerifier.create(firstPaymentFlux)
      .verifyComplete();
}

5. elementAt()

flux API also provides elementAt() way. we can use elementAt(0) To get the first element of a flow in a non-blocking way:

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);
    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

However, if the index passed as a parameter exceeds the number of elements emitted by Flux, an error will be emitted:

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();
    Mono<Payment> firstPayment = emptyFlux.elementAt(0);
    StepVerifier.create(firstPayment)
      .expectError(IndexOutOfBoundsException.class);
}

6. blockfirst()

Alternatively, we can also use BlockFirst(). However, as the name suggests, it is a blocking method. As a result, if we use blockFirst(), We will leave the reactive world, and we will lose all its benefits:

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();
    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. in stream()

Finally, we can convert the flux to a Java 8 stream and then access the first element:

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
      .findFirst();
    assertThat(firstPayment).contains(paymentOf100);
}

But, still, if we do this, we will not be able to continue using reactive pipelines.

8. Conclusion

In this article, we discussed Java’s Reactive Streams API. We a. have seen different ways of accessing the first element of the flow, And we learned that we should stick to non-blocking solutions if we want to use reactive pipelines.

As always, the code for this article can be found on GitHub.

       

Leave a Comment