Reactor3 Flux.create与Flux.push区别与源码分析(一)
系列文章Reactor3 SpscLinkedArrayQueue源码分析????Reactor3 MpscLinkedQueue源码分析????????Reactor3 Flux.create与Flux.push正确打开方式????????????版本Reactor 3.4.9写在前面为了介绍Flux的create与push的源码写了上面三篇博客做铺垫,不太容易????。如果不懂Flux.cre
系列文章
Reactor3 SpscLinkedArrayQueue源码分析🔥
Reactor3 MpscLinkedQueue源码分析🔥🔥
Reactor3 Flux.create与Flux.push正确打开方式🔥🔥🔥
版本
Reactor 3.4.9
写在前面
为了介绍Flux的create与push的源码写了上面三篇博客做铺垫,不太容易👻。
如果不懂Flux.create与push使用请阅读:Reactor3 Flux.create与Flux.push正确打开方式👈。
如果不懂SpscLinkedArrayQueue请点击:Reactor3 SpscLinkedArrayQueue源码分析👈
如果不懂MpscLinkedQueue请进入:Reactor3 MpscLinkedQueue源码分析👈
Flux.create与Flux.push区别
从上面的元素下发图能看到两者在订阅上面没有差别,都有背压功能,由订阅者通过request方法来通知生产者它能接收元素的元素,从而控制元素的下发速度。它们的区别在元素的生产上,Flux.create是可以多线程来提交任务,而Flux.push只能由单个线程来进行提交。
没有错它们的区别就只有一点,从源码角度看它们公用了很多代码,Flux.create只是在Flux.push的FluxSink上多了一层封装。
源码分析
push与create方法的对比
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter) {
return push(emitter, OverflowStrategy.BUFFER);
}
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_ONLY));
}
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}
相同点
- 背压策略一致
- 都是创建一个FluxCreate类的实例
不同点
- push的模式是FluxCreate.CreateMode.PUSH_ONLY,而create是FluxCreate.CreateMode.PUSH_PULL
它们都接收一个Consumer函数式,消费一个FluxSink。什么?FluxSink?s是什么?那就只能先看看FluxSink了。
FluxSink用途分析
我们先看看FluxSink的类图,分析一下它所处的位置。
在图中我用三个红框圈出了比较重要的三个类:FluxSink,Subscription以及BaseSink。Subscription是Subscriber的onSubscribe方法的参数,在发生订阅时订阅者与生产者之间的桥梁。而BaseFlux即实现了又实现了FluxSink,则我们可以猜测FluxSink会是消费者的一个包装,在生产者下发元素时做一下操作后再传递给消费者。
FluxSink方法
FluxSink的next,onCancel等方法都和Subscriber类似,也进一步的验证了我们的猜测。
背压策略
细心的同学可能发现了FluxSink中有一个背压策略的枚举。那它和Reactor3说的那个背压是一个背压吗?没错,它就是那个鼎鼎大名的背压,就是在FluxSink中。
看名称就能看出它们各自代表的策略。
enum OverflowStrategy {
//完全忽略下游的背压请求。
IGNORE,
//当消费不过来有新元素下发时,报错。
ERROR,
//如果下游没有准备好接收信号,则丢弃传入信号。
DROP,
//只会保留最新的元素。
LATEST,
//下发元素时会在一个队列中进行缓存。
BUFFER
}
FluxCreate分析
看完了FluxSink我们就要进入push与create都用到的FluxCreate的内部一探究竟了。
FluxCreate成员变量与构造方法分析
final Consumer<? super FluxSink<T>> source;
final OverflowStrategy backpressure;
final CreateMode createMode;
FluxCreate(Consumer<? super FluxSink<T>> source,
FluxSink.OverflowStrategy backpressure,
CreateMode createMode) {
this.source = Objects.requireNonNull(source, "source");
this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
this.createMode = createMode;
}
一个Consumer
函数会消费一个FluxSink
,一个FluxSink.OverflowStrategy
背压策略,还有一个创建模式。可以说都是我们的老朋友了,在前面都有介绍了。构造函数里也都是简单的赋值,看来黑魔法不在这。
FluxCreate订阅方法
FluxCreate实现了Publisher接口所以它也是一个生产者。
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
BaseSink<T> sink = createSink(actual, backpressure);
actual.onSubscribe(sink);
try {
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}
- 先创建了一个
BaseSink
。push
与create
都是使用的默认的BufferAsyncSink
。这里就不放代码了,里面逻辑不复杂。 - 将
BaseSin
k当做一个Subscription
传给Subscriber
,作为FluxCreatepush
与create
都是使用的默认的BufferAsyncSink
。与Subscriber
的桥梁。 - 接着就是黑魔法啦,通过
CreateMode
模式的不同,来对create
的sink
使用SerializedFluxSink
多进行了一层封装。push
与create
的不同点就是这造成的。
总结
我们通过元素图与源码分析了Flux.create与Flux.push之间的区别。当然还只是停留在表面,内部的源码笔者通过下一篇博客体现。
希望有帮到你,感谢阅读。
更多推荐
所有评论(0)