Close

Reactor - Getting Started

[Updated: Jan 20, 2020, Created: Jan 19, 2020]

Reactor is a fully non-blocking reactive programming API for Java language. For basic concepts visit the Java 9's adaptation of reactive streams.

Flux and Mono

Project Reactor offers composable asynchronous sequence APIs: Flux and Mono.

reactor.core.publisher.Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items.

reactor.core.publisher.Mono is a specialized Publisher that emits at most one item.

Simple Factory methods to create Flux and Mono instances

Class Flux

public static <T> Flux<T> empty()
public static <T> Flux<T> fromArray(T[] array)
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
public static <T> Flux<T> just(T data)
public static <T> Flux<T> just(T... data)
public static Flux<Integer> range(int start, int count)

Class Mono

public static <T> Mono<T> empty()
public static <T> Mono<T> just(T data)

Subscribing to Flux and Mono

In Reactor, when we write a Publisher chain, data does not start pumping into it by default until we call subscribe().

By the act of subscribing, we tie the Publisher to a Subscriber, which triggers the flow of data in the whole chain.

Followings are the commonly used variants of subscribe method defined in both Flux and Mono classes.

Class Flux

public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> consumer)
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, 
                                  Consumer<? super Throwable> errorConsumer)
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, 
                                  @Nullable Consumer<? super Throwable> errorConsumer,        
                                  @Nullable Runnable completeConsumer)
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, 
                                  @Nullable Consumer<? super Throwable> errorConsumer,
                                  @Nullable Runnable completeConsumer,
                                  @Nullable Consumer<? super Subscription> subscriptionConsumer)

Where

  • consumer: the consumer to invoke on each next signal. Can be used to do something on each produced value.
  • errorConsumer: the consumer to invoke on error signal. Can be used to react to an error.
  • completeConsumer: the consumer to invoke on complete signal. Can be used when the sequence successfully completes.
  • subscriptionConsumer: the consumer to invoke on subscribe signal. Can be used to do something with the Subscription produced by this subscribe call.

Disposable interface

All subscribe() methods have reactor.core.Disposable return type. The Disposable instance can be used to cancel the subscription by calling its dispose() method.

Java 8 Streams vs Reactive Programming

Java 8 streams are pulled-based, i.e. terminal operations are responsible of elements flow through the stream.

Reactive streams are push-based, i.e. elements are pushed by Publisher to the Subscribers as they are available.

Java 8 streams can be used only once, whereas Reactive Streams can be used many times.

Reactor Example

Maven dependency

pom.xml

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.3.2.RELEASE</version>
</dependency>

Using Flux and Mono

public class ReactorGettingStartedExample {
  public static void main(String[] args) {
      System.out.println("-- Mono example --");
      Mono<String> mono = Mono.just("test mono");
      mono.subscribe(System.out::println);

      System.out.println("-- Flux example --");
      Flux<Integer> flux = Flux.just(1, 2, 3);
      flux.subscribe(System.out::println);
  }
}
-- Mono example --
test mono
-- Flux example --
1
2
3

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.2.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Project Rector - Getting started example Select All Download
  • reactor-getting-started
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • ReactorGettingStartedExample.java

    See Also