Close

Reactor - Transforming into Publishers and Delaying any Error with flatMapDelayError() and flatMapSequentialDelayError()

[Last Updated: Jun 7, 2020]

This variant of flatMap delays any error until after the rest of the flatMap backlog has been processed (see also flatMap()).

public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
                                           int concurrency,
                                           int prefetch)

Sequential Processing with delaying error (see also flatMapSequential()):

public final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
                                                     int maxConcurrency,
                                                     int prefetch)

Examples

public class Example1FlatMapDelayError {
  public static void main(String[] args) {
      Flux.just(1, 3, 5, 7)
          .flatMapDelayError(integer -> {
              System.out.println("-----------");
              if (integer == 5) {
                  throw new IllegalArgumentException("test exception");
              }
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 4, 3).onErrorResume(throwable -> {
          System.out.println("-- error --");
          System.out.println(throwable);
          return Flux.just(20);
      })
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-----------
-----------
-----------
1 - myThread-1 - 20:32:16.460
3 - myThread-1 - 20:32:16.469
4 - myThread-1 - 20:32:16.469
5 - myThread-1 - 20:32:16.469
2 - myThread-1 - 20:32:16.471
6 - myThread-1 - 20:32:16.472
7 - myThread-1 - 20:32:16.473
8 - myThread-1 - 20:32:16.473
-- error --
java.lang.IllegalArgumentException: test exception
20 - myThread-1 - 20:32:16.474

flatMap() Without Delaying Error

public class Example2FlatMapWithoutDelayError {
  public static void main(String[] args) {
      Flux.just(1, 3, 5, 7)
          .flatMap(integer -> {
              System.out.println("-----------");
              if (integer == 5) {
                  throw new IllegalArgumentException("test exception");
              }
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 4, 3).onErrorResume(throwable -> {
          System.out.println("-- error --");
          System.out.println(throwable);
          return Flux.just(20);
      })
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-----------
-----------
-----------
1 - myThread-1 - 20:33:30.757
-- error --
java.lang.IllegalArgumentException: test exception
20 - myThread-1 - 20:33:30.766

Sequential Variant

public class Example3FlatMapSequentialDelayError {
  public static void main(String[] args) {
      Flux.just(1, 3, 5, 7)
          .flatMapSequentialDelayError(integer -> {
              System.out.println("-----------");
              if (integer == 5) {
                  throw new IllegalArgumentException("test exception");
              }
              return Flux.range(integer, integer * 2)
                         .subscribeOn(Schedulers.newParallel("myThread", 8));
          }, 4, 3).onErrorResume(throwable -> {
          System.out.println("-- error --");
          System.out.println(throwable);
          return Flux.just(200);
      })
          .subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
  }
}
-----------
-----------
-----------
1 - myThread-1 - 20:34:28.181
2 - myThread-1 - 20:34:28.188
3 - myThread-1 - 20:34:28.188
4 - myThread-1 - 20:34:28.188
5 - myThread-1 - 20:34:28.188
6 - myThread-2 - 20:34:28.191
7 - myThread-2 - 20:34:28.191
8 - myThread-2 - 20:34:28.191
-- error --
java.lang.IllegalArgumentException: test exception
200 - myThread-2 - 20:34:28.192

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.3.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Reactor - flatMapDelayError() and flatMapSequentialDelayError() examples Select All Download
  • reactor-flat-map-delay-error
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • Example1FlatMapDelayError.java

    See Also