Close

Reactor - Transforming into Publishers and then flattening by using flatMap operation

[Updated: Feb 27, 2020, Created: Feb 25, 2020]

The flatMap() methods transform the elements asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging.

public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)

With error or complete signal:

public final <R> Flux<R> flatMap(@Nullable
                                 Function<? super T,? extends Publisher<? extends R>> mapperOnNext,
                                 @Nullable
                                 Function<? super Throwable,? extends Publisher<? extends R>> mapperOnError,
                                 @Nullable
                                 Supplier<? extends Publisher<? extends R>> mapperOnComplete)

With concurrency parameter which allows to control how many Publisher can be subscribed to and merged in parallel.

public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,  int concurrency)

With concurrency and prefetch. The prefetch parameter means the size of the first Subscription.request(long) to the merged Publisher.

public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
                                 int concurrency,
                                 int prefetch)

Examples

public class FlatMapOperationExample {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements --");
      Flux.just(1, 2, 3)
          .flatMap( integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer*2);
          })
          .subscribe(System.out::println);

      System.out.println("-- Mapping Mono element --");
      Mono.just("supercalifragilisticexpialidocious")
          .flatMap(s -> Mono.just(s.length()))
          .subscribe(System.out::println);
  }
}
-- Mapping Flux elements --
-----------
1
2
-----------
2
3
4
5
-----------
3
4
5
6
7
8
-- Mapping Mono element --
34

flatMap() with complete or error signal

public class FlatMapOperationExample2WithoutError {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements without error --");
      Flux.just(1, 2, 3)
          .flatMap(integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer * 2);
          }, throwable -> {
              System.out.println("error: " + throwable);
              return Flux.just(100);
          }, () -> Flux.just(200))
          .subscribe(System.out::println);

  }
}
-- Mapping Flux elements without error --
-----------
1
2
-----------
2
3
4
5
-----------
3
4
5
6
7
8
200
public class FlatMapOperationExample3WithError {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements with error --");
      Flux.just(1, 2, 3)
          .map(i -> {
              if (i == 2) {
                  throw new IllegalArgumentException("test exception");
              }
              return i + 1;
          })
          .flatMap(integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer * 2);
          }, throwable -> {
              System.out.println("error: " + throwable);
              return Flux.just(100);
          }, () -> Flux.just(200))
          .subscribe(System.out::println);

  }
}
-- Mapping Flux elements with error --
-----------
2
3
4
5
error: java.lang.IllegalArgumentException: test exception
100

flatmap() with Concurrency

public class FlatMapOperationExample4Concurrency {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements --");
      Flux.just(1, 2, 3)
          .flatMap( integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer*2);
          }, 3)
          .subscribe(e -> System.out.println(e+" - "+Thread.currentThread().getName()));
  }
}
-- Mapping Flux elements --
-----------
1 - main
2 - main
-----------
2 - main
3 - main
4 - main
5 - main
-----------
3 - main
4 - main
5 - main
6 - main
7 - main
8 - main

The flatMap() method subscribes to multiple inner Publisher. By default up to the concurrency parameter with a default of Queues.SMALL_BUFFER_SIZE (256).

If concurrency is set to n, flatMap will map n source elements to their inner Publisher. Without parallelism it will wait for at least one to complete before it starts mapping more source elements.

With parallelism:

public class FlatMapOperationExample5Concurrency {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements --");
      Flux.just(1, 2, 3)
          .flatMap(integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 10)
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName()+" - "+ LocalTime.now()));
  }
}
-- Mapping Flux elements --
-----------
-----------
-----------
1 - myThread-1 - 20:21:13.106
2 - myThread-1 - 20:21:13.114
3 - myThread-1 - 20:21:13.114
4 - myThread-1 - 20:21:13.114
5 - myThread-1 - 20:21:13.115
3 - myThread-1 - 20:21:13.115
4 - myThread-1 - 20:21:13.115
5 - myThread-1 - 20:21:13.115
6 - myThread-1 - 20:21:13.115
7 - myThread-1 - 20:21:13.115
8 - myThread-1 - 20:21:13.115
2 - myThread-1 - 20:21:13.115
public class FlatMapOperationExample6Concurrency {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements --");
      Flux.just(1, 2, 3)
          .flatMap(integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 1)
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- Mapping Flux elements --
-----------
1 - myThread-1 - 20:21:57.551
2 - myThread-1 - 20:21:57.563
-----------
2 - myThread-2 - 20:21:57.563
3 - myThread-2 - 20:21:57.563
4 - myThread-2 - 20:21:57.563
5 - myThread-2 - 20:21:57.564
-----------
3 - myThread-3 - 20:21:57.564
4 - myThread-3 - 20:21:57.564
5 - myThread-3 - 20:21:57.564
6 - myThread-3 - 20:21:57.564
7 - myThread-3 - 20:21:57.564
8 - myThread-3 - 20:21:57.564

flatMap() with prefetch

public class FlatMapOperationExample7ConcurrencyPrefetch {
  public static void main(String[] args) {
      System.out.println("-- Mapping Flux elements --");
      Flux.just(1, 2, 3)
          .flatMap(integer -> {
              System.out.println("-----------");
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 4,3)
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName()+" - "+ LocalTime.now()));
  }
}
-- Mapping Flux elements --
-----------
-----------
-----------
2 - myThread-2 - 20:24:38.525
1 - myThread-2 - 20:24:38.531
2 - myThread-2 - 20:24:38.531
3 - myThread-2 - 20:24:38.531
4 - myThread-2 - 20:24:38.532
5 - myThread-2 - 20:24:38.532
3 - myThread-2 - 20:24:38.534
6 - myThread-2 - 20:24:38.534
7 - myThread-2 - 20:24:38.534
8 - myThread-2 - 20:24:38.534
4 - myThread-2 - 20:24:38.534
5 - myThread-2 - 20:24:38.534

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.2.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Reactor - flatMap() example Select All Download
  • reactor-flat-map-operation
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • FlatMapOperationExample.java

    See Also