Following Flux methods allow to programmatically create a Flux :
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a Flux by generating signals one-by-one via a consumer callback.
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state. The stateSupplier may return null.
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state, with a final cleanup callback. The stateSupplier may return null but your cleanup stateConsumer will need to handle the null case.
Interface SynchronousSink
Interface to produce synchronously "one signal" to an underlying Subscriber. At most one next call and/or one complete() or error(Throwable) should be called per invocation of the generator function.
package reactor.core.publisher;
....
public interface SynchronousSink<T> {
void complete();
Context currentContext();
void error(Throwable e);
void next(T t);
}
Examples
generate(Consumer<SynchronousSink<T>> generator)
package com.logicbig.example;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
public class GenerateViaConsumerSyncSink {
public static void main(String[] args) {
AtomicInteger ai = new AtomicInteger(0);
Flux<Integer> flux = Flux.generate(
sink -> {
sink.next(ai.incrementAndGet());
if (ai.get() == 5) {
sink.complete();
}
}
);
flux.subscribe(System.out::println);
}
}
Output1 2 3 4 5
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator)
package com.logicbig.example;
import reactor.core.publisher.Flux;
public class GenerateViaSyncSink {
public static void main(String[] args) {
Flux<String> flux = Flux.generate(
() -> 1,
(state, sink) -> {
sink.next("state = " + state);
if (state > 10){
sink.complete();}
return state + 2;
});
flux.subscribe(System.out::println);
}
}
Outputstate = 1 state = 3 state = 5 state = 7 state = 9 state = 11
package com.logicbig.example;
import reactor.core.publisher.Flux;
public class GenerateViaSyncSink2 {
public static void main(String[] args) {
Flux<String> flux = Flux.generate(
() -> "apple",
(state, sink) -> {
sink.next("other "+state);
if (state.length()>10) sink.complete();
return state + " more";
});
flux.subscribe(System.out::println);
}
}
Outputother apple other apple more other apple more more
public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
BiFunction<S,SynchronousSink<T>,S> generator,
Consumer<? super S> stateConsumer)
package com.logicbig.example;
import reactor.core.publisher.Flux;
import java.util.function.Consumer;
public class GenerateViaSyncSinkWithLastConsumer {
public static void main(String[] args) {
Flux<String> flux = Flux.generate(
() -> "apple",
(state, sink) -> {
sink.next("other " + state);
if (state.length() > 10)
sink.complete();
return state + " more";
}, new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("state consumer-> "+s);
}
});
flux.subscribe(System.out::println);
}
}
Outputother apple other apple more other apple more more state consumer-> apple more more more
Example ProjectDependencies and Technologies Used: - reactor-core 3.4.0: Non-Blocking Reactive Foundation for the JVM.
- JDK 8
- Maven 3.5.4
|