This variant of flatMap delays any error until after the rest of the flatMap backlog has been processed (see also flatMap()).
public final <V> Flux<V> flatMapDelayError(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency,
int prefetch)
Sequential Processing with delaying error (see also flatMapSequential()):
public final <R> Flux<R> flatMapSequentialDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
Examples
public class Example1FlatMapDelayError {
public static void main(String[] args) {
Flux.just(1, 3, 5, 7)
.flatMapDelayError(integer -> {
System.out.println("-----------");
if (integer == 5) {
throw new IllegalArgumentException("test exception");
}
return Flux.range(integer, integer * 2)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 4, 3).onErrorResume(throwable -> {
System.out.println("-- error --");
System.out.println(throwable);
return Flux.just(20);
})
.subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-----------
-----------
-----------
1 - myThread-1 - 20:32:16.460
3 - myThread-1 - 20:32:16.469
4 - myThread-1 - 20:32:16.469
5 - myThread-1 - 20:32:16.469
2 - myThread-1 - 20:32:16.471
6 - myThread-1 - 20:32:16.472
7 - myThread-1 - 20:32:16.473
8 - myThread-1 - 20:32:16.473
-- error --
java.lang.IllegalArgumentException: test exception
20 - myThread-1 - 20:32:16.474
flatMap() Without Delaying Error
public class Example2FlatMapWithoutDelayError {
public static void main(String[] args) {
Flux.just(1, 3, 5, 7)
.flatMap(integer -> {
System.out.println("-----------");
if (integer == 5) {
throw new IllegalArgumentException("test exception");
}
return Flux.range(integer, integer * 2)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 4, 3).onErrorResume(throwable -> {
System.out.println("-- error --");
System.out.println(throwable);
return Flux.just(20);
})
.subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-----------
-----------
-----------
1 - myThread-1 - 20:33:30.757
-- error --
java.lang.IllegalArgumentException: test exception
20 - myThread-1 - 20:33:30.766
Sequential Variant
public class Example3FlatMapSequentialDelayError {
public static void main(String[] args) {
Flux.just(1, 3, 5, 7)
.flatMapSequentialDelayError(integer -> {
System.out.println("-----------");
if (integer == 5) {
throw new IllegalArgumentException("test exception");
}
return Flux.range(integer, integer * 2)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 4, 3).onErrorResume(throwable -> {
System.out.println("-- error --");
System.out.println(throwable);
return Flux.just(200);
})
.subscribe(e -> System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-----------
-----------
-----------
1 - myThread-1 - 20:34:28.181
2 - myThread-1 - 20:34:28.188
3 - myThread-1 - 20:34:28.188
4 - myThread-1 - 20:34:28.188
5 - myThread-1 - 20:34:28.188
6 - myThread-2 - 20:34:28.191
7 - myThread-2 - 20:34:28.191
8 - myThread-2 - 20:34:28.191
-- error --
java.lang.IllegalArgumentException: test exception
200 - myThread-2 - 20:34:28.192
Example ProjectDependencies and Technologies Used: - reactor-core 3.3.3.RELEASE: Non-Blocking Reactive Foundation for the JVM.
- JDK 8
- Maven 3.5.4
|