Close

Reactor - Understanding Flux/Mono's retryWhen()

[Last Updated: Jun 14, 2020]

In this tutorial we will understand retryWhen() method which is more advanced form of retry() method.

The method retryWhen() is defined in both Flux and Mono classes.

Flux method

public final Flux<T> retryWhen(Retry retrySpec)

Mono method

public final Mono<T> retryWhen(Retry retrySpec)

Both above methods take a single parameter of type Retry.

The class Retry

The class Retry is abstract base class for a strategy to decide when to retry. It has only one abstract method generateCompanion(...)

package reactor.util.retry;
 ....
public abstract class Retry {
    ....
    public abstract Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals);
    ....
}

The method generateCompanion() needs to return a Publisher which might delay or limit retry attempts.

The parameter retrySingles (Flux of Retry.RetrySignal) gives information about failure and number of errors.

The interface Retry.RetrySignal

public interface RetrySignal {
     //The ZERO BASED index number of this error (how many retries have occurred so far)
     long totalRetries();

     //The ZERO BASED index number of this error since the beginning of the current burst of errors.
     // This is reset to zero whenever a retry is made that is followed by at least one Subscriber#onNext(Object).
     long totalRetriesInARow();
      
     //The current Throwable that needs to be evaluated for retry.
     Throwable failure();

     //Return an immutable copy of this RetrySignal which is guaranteed to give a consistent view
     default RetrySignal copy() {....}
}

Quick Example

Assuming that method process() can throw exception:

 .....
 public <T> void doSomething(Flux<T> flux) {
     flux.map(this::process)
         .retryWhen(new Retry() {
             @Override
             public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
                 return retrySignals.map(rs -> getNumberOfTries(rs));
             }
         })
         .subscribe();
 }

 private Long getNumberOfTries(Retry.RetrySignal rs) {
     if (rs.totalRetries() < 3) {
         return rs.totalRetries();
     } else {
         System.err.println("retries exhausted");
         throw Exceptions.propagate(rs.failure());
     }
 }
 ....

The above code will retry 3 times before propagating the exception.

Retry.from() method

A wrapper around Function to provide Retry by using lambda expressions.

public static final Retry from(Function<Flux<RetrySignal>, Publisher<?>> function)

Quick Example

 ....
 public <T> void doSomething(Flux<T> flux) {
     flux.map(this::process)
         .retryWhen(Retry.from((retrySignals) -> {
             return retrySignals.map(rs -> getNumberOfTries(rs));
         }))
         .subscribe();
 }

 private Long getNumberOfTries(Retry.RetrySignal rs) {
     print(rs);
     if (rs.totalRetries() < 3) {
         return rs.totalRetries();
     } else {
         System.err.println("retries exhausted");
         throw Exceptions.propagate(rs.failure());
     }
 }
 ....

Concrete sub classes of Retry

The class Retry has two concrete sub-classes

  • RetrySpec
  • RetryBackoffSpec

java.lang.ObjectObjectreactor.util.retry.RetryRetryreactor.util.retry.RetrySpecRetrySpecreactor.util.retry.RetryBackoffSpecRetryBackoffSpecLogicBig

Class RetrySpec

It is a simple count-based Retry strategy with configurable features.

Factory methods of Retry that returns RetrySpec

package reactor.util.retry;
 ....
public abstract class Retry {

	//For a simple strategy with maximum number of retry attempts. 
	public static RetrySpec max(long max) {....}
	
        //For a simple strategy with maximum number of retry attempts over subsequent transient errors. 
        //An Subscriber#onNext(Object) between errors resets the counter.
	public static RetrySpec maxInARow(long maxInARow) {......}

        //For retrying immediately and indefinitely, similar to Flux#retry().
	public static RetrySpec indefinitely() {.....}
}

Customizing RetrySpec

Following method of RetrySpec can be used for customization/configuration:

package reactor.util.retry;
..
public final class RetrySpec extends Retry {
    ....
    //Sets the maximum number of retry attempts allowed
    public RetrySpec maxAttempts(long maxAttempts) {...}
	
    //Sets the Predicate that will filter which errors can be retried. Exceptions
    //that don't pass the predicate will be propagated downstream and terminate the retry sequence. 
    public RetrySpec filter(Predicate<? super Throwable> errorFilter) {....}

    //Allows to augment a previously filter(Predicate) with
    //a new condition to allow retries of some exception or not. 
    public RetrySpec modifyErrorFilter(......}

    //Adds synchronous behavior to be executed before the retry trigger is emitted
    public RetrySpec doBeforeRetry(.....}

    //Adds synchronous behavior to be executed after the retry trigger is emitted
    public RetrySpec doAfterRetry(Consumer<RetrySignal> doAfterRetry) {.....}

    //Adds asynchronous behavior to be executed before the current retry trigger in the companion publisher,
    //thus delaying the resulting retry trigger with the additional Mono.
    public RetrySpec doBeforeRetryAsync(......}
	
    //Adds asynchronous behavior to be executed after the current retry trigger in the companion publisher,
    //thus delaying the resulting retry trigger with the additional Mono.
    public RetrySpec doAfterRetryAsync(.....}

    //Sets the generator for the Exception to be propagated when the maximum amount of retries is exhausted. 
    public RetrySpec onRetryExhaustedThrow(
                           BiFunction<RetrySpec, RetrySignal, Throwable> retryExhaustedGenerator) {.....}

    //Sets the transient error mode, indicating that the strategy being built should use 
    //RetrySignal#totalRetriesInARow()} rather than RetrySignal#totalRetries()
    public RetrySpec transientErrors(boolean isTransientErrors) {.....}
    ....
}

All above methods return a new copy of the RetrySpec which can either be further configured or used as a Retry

Class RetryBackoffSpec

It is a Retry strategy based on exponential backoffs, with configurable features.

Factory methods of Retry that returns RetryBackoffSpec

package reactor.util.retry;
...
public abstract class Retry {

   //For exponential backoff strategy with jitter, given a maximum number of retry attempts
   //and a minimum Duration for the backoff.
   public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) {.....}

   //For fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts
   //and the fixed {@link Duration} for the backoff.
   public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) {.....}
...
}

Customizing RetryBackoffSpec

Following method of RetryBackoffSpec can be used for customization/configuration:

package reactor.util.retry;
 ....
public final class RetryBackoffSpec extends Retry {

   //Sets the maximum number of retry attempts allowed.
   public RetryBackoffSpec maxAttempts(long maxAttempts) {.....}

   //Sets the Predicate that will filter which errors can be retried
   public RetryBackoffSpec filter(Predicate<? super Throwable> errorFilter) {....}

   //Allows to augment a previously filter(Predicate) with a new condition to allow 
   //retries of some exception or not. 
   public RetryBackoffSpec modifyErrorFilter(.....}

   //Add synchronous behavior to be executed before the retry trigger is emitted
   public RetryBackoffSpec doBeforeRetry(.......}

   //Add synchronous behavior to be executed after the retry trigger is emitted
   public RetryBackoffSpec doAfterRetry(Consumer<RetrySignal> doAfterRetry) {.....}

   //Add asynchronous behavior to be executed before the current retry trigger in the companion publisher,
   //thus delaying the resulting retry trigger with the additional Mono.
   public RetryBackoffSpec doBeforeRetryAsync(.....}

   //Add asynchronous behavior to be executed after the current retry trigger in the companion publisher,
   //thus delaying the resulting retry trigger with the additional Mono.
   public RetryBackoffSpec doAfterRetryAsync(
			Function<RetrySignal, Mono<Void>> doAsyncAfterRetry) {.....}

   //Sets the generator for the Exception to be propagated when the maximum amount of retries is exhausted. 
   public RetryBackoffSpec onRetryExhaustedThrow(
                           BiFunction<RetryBackoffSpec, RetrySignal, Throwable> retryExhaustedGenerator) {...}

   //Sets the transient error mode, indicating that the strategy being built should use RetrySignal#totalRetriesInARow()
   //rather than RetrySignal#totalRetries().
   public RetryBackoffSpec transientErrors(boolean isTransientErrors) {.....}

   //Sets the minimum {@link Duration} for the first backoff. This method switches to an exponential backoff strategy 
   //if not already done so. 
   public RetryBackoffSpec minBackoff(Duration minBackoff) {.....}

   //Sets a hard maximum Duration for exponential backoffs. 
   public RetryBackoffSpec maxBackoff(Duration maxBackoff) {.....}

   //Sets a jitter factor for exponential backoffs that adds randomness to each backoff. This can
   //be helpful in reducing cascading failure due to retry-storms. 
   public RetryBackoffSpec jitter(double jitterFactor) {....}

   //Sets a Scheduler on which to execute the delays computed by the exponential backoff strategy. 
   public RetryBackoffSpec scheduler(@Nullable Scheduler backoffScheduler) {......}
}

All above methods return a new copy of the RetryBackoffSpec which can either be further configured or used as Retry.


In up coming tutorials we will go through various examples involving retryWhen().

See Also