Close

Reactor - Transforming into Publisher and Maintaining the source order with flatMapSequential()

[Updated: Feb 27, 2020, Created: Feb 27, 2020]

The FlatMapSequential() methods of Flux class transforms the elements into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)

With max concurrency (see also flatMap()):

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
                                           int maxConcurrency

With max concurrency and prefetch (see also flatMap()):

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
                                           int maxConcurrency,
                                           int prefetch)

Examples

public class Example1FlatMapSequentialAndParallelism {
  public static void main(String[] args) {
      System.out.println("-- flatMapSequential without parallelism --");
      Flux.just(1, 5, 9)
          .flatMapSequential(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4);
          })
          .subscribe(x ->
                  System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

      System.out.println("-- flatMapSequential with parallelism --");
      Flux.just(1, 5, 9)
          .flatMapSequential(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          })
          .subscribe(x ->
                  System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- flatMapSequential without parallelism --
-- starting: 1 --
1 - main - 21:22:58.114
2 - main - 21:22:58.129
3 - main - 21:22:58.129
4 - main - 21:22:58.129
-- starting: 5 --
5 - main - 21:22:58.129
6 - main - 21:22:58.129
7 - main - 21:22:58.129
8 - main - 21:22:58.129
-- starting: 9 --
9 - main - 21:22:58.129
10 - main - 21:22:58.129
11 - main - 21:22:58.129
12 - main - 21:22:58.129
-- flatMapSequential with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:22:58.160
2 - myThread-1 - 21:22:58.160
3 - myThread-1 - 21:22:58.160
4 - myThread-1 - 21:22:58.160
5 - myThread-1 - 21:22:58.160
6 - myThread-1 - 21:22:58.160
7 - myThread-1 - 21:22:58.160
8 - myThread-1 - 21:22:58.160
9 - myThread-3 - 21:22:58.160
10 - myThread-3 - 21:22:58.160
11 - myThread-3 - 21:22:58.160
12 - myThread-3 - 21:22:58.160
Comparing with flatMap()
public class Example2FlatMapAndParallelism {
  public static void main(String[] args) {
      System.out.println("-- flatMap without parallelism --");
      Flux.just(1, 5, 9)
          .flatMap(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4);
          })
          .subscribe(x ->
                  System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

      System.out.println("-- flatMap with parallelism --");
      Flux.just(1, 5, 9)
          .flatMap(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          })
          .subscribe(x ->
                  System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- flatMap without parallelism --
-- starting: 1 --
1 - main - 21:24:12.789
2 - main - 21:24:12.789
3 - main - 21:24:12.789
4 - main - 21:24:12.789
-- starting: 5 --
5 - main - 21:24:12.789
6 - main - 21:24:12.789
7 - main - 21:24:12.789
8 - main - 21:24:12.789
-- starting: 9 --
9 - main - 21:24:12.789
10 - main - 21:24:12.789
11 - main - 21:24:12.789
12 - main - 21:24:12.789
-- flatMap with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:24:12.820
2 - myThread-1 - 21:24:12.820
3 - myThread-1 - 21:24:12.820
4 - myThread-1 - 21:24:12.820
9 - myThread-3 - 21:24:12.820
10 - myThread-3 - 21:24:12.820
11 - myThread-3 - 21:24:12.820
12 - myThread-3 - 21:24:12.820
5 - myThread-2 - 21:24:12.820
6 - myThread-2 - 21:24:12.820
7 - myThread-2 - 21:24:12.820
8 - myThread-2 - 21:24:12.820

With concurrency:

concurrency = 1
public class Example3FlatMapSequentialAndConcurrency {
  public static void main(String[] args) {
      System.out.println("-- flatMapSequential with parallelism and  concurrency --");
      Flux.just(1, 5, 9)
          .flatMapSequential(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 1)
          .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- flatMapSequential with parallelism and  concurrency --
-- starting: 1 --
1 - myThread-1 - 21:25:40.432
2 - myThread-1 - 21:25:40.448
3 - myThread-1 - 21:25:40.448
4 - myThread-1 - 21:25:40.448
-- starting: 5 --
5 - myThread-2 - 21:25:40.448
6 - myThread-2 - 21:25:40.448
7 - myThread-2 - 21:25:40.448
8 - myThread-2 - 21:25:40.448
-- starting: 9 --
9 - myThread-3 - 21:25:40.448
10 - myThread-3 - 21:25:40.448
11 - myThread-3 - 21:25:40.448
12 - myThread-3 - 21:25:40.448
concurrency = 2
public class Example4FlatMapSequentialAndConcurrency {
  public static void main(String[] args) {
      System.out.println("-- flatMapSequential with parallelism and  concurrency --");
      Flux.just(1, 5, 9)
          .flatMapSequential(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 2)
          .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- flatMapSequential with parallelism and  concurrency --
-- starting: 1 --
-- starting: 5 --
1 - myThread-2 - 21:26:21.442
2 - myThread-2 - 21:26:21.442
3 - myThread-2 - 21:26:21.442
4 - myThread-2 - 21:26:21.442
-- starting: 9 --
5 - myThread-2 - 21:26:21.442
6 - myThread-2 - 21:26:21.442
7 - myThread-2 - 21:26:21.442
8 - myThread-2 - 21:26:21.442
9 - myThread-3 - 21:26:21.442
10 - myThread-3 - 21:26:21.442
11 - myThread-3 - 21:26:21.442
12 - myThread-3 - 21:26:21.442

With concurrency and prefetch

public class Example5FlatMapSequentialConcurrencyAndPrefetch {
  public static void main(String[] args) {
      System.out.println("-- flatMapSequential with parallelism, concurrency and prefetch --");
      Flux.just(1, 5, 9)
          .flatMapSequential(integer -> {
              System.out.println("-- starting: " + integer + " --");
              return Flux.range(integer, 4)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 1, 4)
          .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-- flatMapSequential with parallelism, concurrency and prefetch --
-- starting: 1 --
1 - myThread-1 - 21:28:42.495
2 - myThread-1 - 21:28:42.511
3 - myThread-1 - 21:28:42.511
4 - myThread-1 - 21:28:42.511
-- starting: 5 --
5 - myThread-2 - 21:28:42.511
6 - myThread-2 - 21:28:42.511
7 - myThread-2 - 21:28:42.511
8 - myThread-2 - 21:28:42.511
-- starting: 9 --
9 - myThread-3 - 21:28:42.511
10 - myThread-3 - 21:28:42.511
11 - myThread-3 - 21:28:42.511
12 - myThread-3 - 21:28:42.511

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.3.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Reactor - flatMapSequential() example Select All Download
  • reactor-flat-map-sequential-operation
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • Example1FlatMapSequentialAndParallelism.java

    See Also