Close

Reactor - Programmatically generate Flux via Consumer<SynchronousSink<T>>

[Last Updated: Nov 1, 2020]

Following Flux methods allow to programmatically create a Flux :

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

Programmatically create a Flux by generating signals one-by-one via a consumer callback.

public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
                                     BiFunction<S,SynchronousSink<T>,S> generator)

Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state. The stateSupplier may return null.

public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
                                     BiFunction<S,SynchronousSink<T>,S> generator,
                                     Consumer<? super S> stateConsumer)

Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state, with a final cleanup callback. The stateSupplier may return null but your cleanup stateConsumer will need to handle the null case.

Interface SynchronousSink

Interface to produce synchronously "one signal" to an underlying Subscriber. At most one next call and/or one complete() or error(Throwable) should be called per invocation of the generator function.

package reactor.core.publisher;
  ....
public interface SynchronousSink<T> {
	void complete();
	Context currentContext();
	void error(Throwable e);
	void next(T t);
}

Examples

generate(Consumer<SynchronousSink<T>> generator)
package com.logicbig.example;


import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;

public class GenerateViaConsumerSyncSink {
  public static void main(String[] args) {
      AtomicInteger ai = new AtomicInteger(0);
      Flux<Integer> flux = Flux.generate(
              sink -> {
                  sink.next(ai.incrementAndGet());
                  if (ai.get() == 5) {
                      sink.complete();
                  }
              }
      );

      flux.subscribe(System.out::println);
  }
}

Output

1
2
3
4
5
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
                                     BiFunction<S,SynchronousSink<T>,S> generator)
package com.logicbig.example;


import reactor.core.publisher.Flux;

public class GenerateViaSyncSink {
  public static void main(String[] args) {
      Flux<String> flux = Flux.generate(
              () -> 1,
              (state, sink) -> {
                  sink.next("state = " + state);
                  if (state > 10){
                      sink.complete();}
                  return state + 2;
              });
      flux.subscribe(System.out::println);
  }
}

Output

state = 1
state = 3
state = 5
state = 7
state = 9
state = 11
package com.logicbig.example;


import reactor.core.publisher.Flux;

public class GenerateViaSyncSink2 {
  public static void main(String[] args) {
      Flux<String> flux = Flux.generate(
              () -> "apple",
              (state, sink) -> {
                  sink.next("other "+state);
                  if (state.length()>10) sink.complete();
                  return state + " more";
              });

      flux.subscribe(System.out::println);
  }
}

Output

other apple
other apple more
other apple more more
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
                                     BiFunction<S,SynchronousSink<T>,S> generator,
                                     Consumer<? super S> stateConsumer)
package com.logicbig.example;


import reactor.core.publisher.Flux;
import java.util.function.Consumer;

public class GenerateViaSyncSinkWithLastConsumer {
  public static void main(String[] args) {
      Flux<String> flux = Flux.generate(
              () -> "apple",
              (state, sink) -> {
                  sink.next("other " + state);
                  if (state.length() > 10)
                      sink.complete();
                  return state + " more";
              }, new Consumer<String>() {
                  @Override
                  public void accept(String s) {
                      System.out.println("state consumer-> "+s);
                  }
              });

      flux.subscribe(System.out::println);
  }
}

Output

other apple
other apple more
other apple more more
state consumer-> apple more more more

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.4.0: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Flux#generate() example to generate Flux programmatically Select All Download
  • reactor-programmatically-generating-flux
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • GenerateViaSyncSink.java

    See Also