系列文章

Reactor3 SpscLinkedArrayQueue源码分析🔥
Reactor3 MpscLinkedQueue源码分析🔥🔥
Reactor3 Flux.create与Flux.push正确打开方式🔥🔥🔥

版本

Reactor 3.4.9

写在前面

为了介绍Flux的create与push的源码写了上面三篇博客做铺垫,不太容易👻。
如果不懂Flux.create与push使用请阅读:Reactor3 Flux.create与Flux.push正确打开方式👈
如果不懂SpscLinkedArrayQueue请点击:Reactor3 SpscLinkedArrayQueue源码分析👈
如果不懂MpscLinkedQueue请进入:Reactor3 MpscLinkedQueue源码分析👈


Flux.create与Flux.push区别

在这里插入图片描述
在这里插入图片描述
从上面的元素下发图能看到两者在订阅上面没有差别,都有背压功能,由订阅者通过request方法来通知生产者它能接收元素的元素,从而控制元素的下发速度。它们的区别在元素的生产上,Flux.create是可以多线程来提交任务,而Flux.push只能由单个线程来进行提交
没有错它们的区别就只有一点,从源码角度看它们公用了很多代码,Flux.create只是在Flux.push的FluxSink上多了一层封装。

源码分析

push与create方法的对比

public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter) {
		return push(emitter, OverflowStrategy.BUFFER);
	}

public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
		return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_ONLY));
	}

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
	    return create(emitter, OverflowStrategy.BUFFER);
    }
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
		return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
	}

相同点

  • 背压策略一致
  • 都是创建一个FluxCreate类的实例

不同点

  • push的模式是FluxCreate.CreateMode.PUSH_ONLY,而create是FluxCreate.CreateMode.PUSH_PULL

它们都接收一个Consumer函数式,消费一个FluxSink。什么?FluxSink?s是什么?那就只能先看看FluxSink了。

FluxSink用途分析

我们先看看FluxSink的类图,分析一下它所处的位置。
在这里插入图片描述
在图中我用三个红框圈出了比较重要的三个类:FluxSink,Subscription以及BaseSink。Subscription是Subscriber的onSubscribe方法的参数,在发生订阅时订阅者与生产者之间的桥梁。而BaseFlux即实现了又实现了FluxSink,则我们可以猜测FluxSink会是消费者的一个包装,在生产者下发元素时做一下操作后再传递给消费者。

FluxSink方法

在这里插入图片描述
Subscriber方法

FluxSink的next,onCancel等方法都和Subscriber类似,也进一步的验证了我们的猜测。

背压策略

细心的同学可能发现了FluxSink中有一个背压策略的枚举。那它和Reactor3说的那个背压是一个背压吗?没错,它就是那个鼎鼎大名的背压,就是在FluxSink中。
!](https://img-blog.csdnimg.cn/478a211634624a28804a64ac264a8609.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAMjHkuJbnuqrmiYvoibrkuro=,size_11,color_FFFFFF,t_70,g_se,x_16)
看名称就能看出它们各自代表的策略。

enum OverflowStrategy {
		//完全忽略下游的背压请求。
		IGNORE,
		//当消费不过来有新元素下发时,报错。
		ERROR,
		//如果下游没有准备好接收信号,则丢弃传入信号。
		DROP,
		//只会保留最新的元素。
		LATEST,
		//下发元素时会在一个队列中进行缓存。
		BUFFER
	}

FluxCreate分析

看完了FluxSink我们就要进入push与create都用到的FluxCreate的内部一探究竟了。

FluxCreate成员变量与构造方法分析
final Consumer<? super FluxSink<T>> source;

	final OverflowStrategy backpressure;

	final CreateMode createMode;

	FluxCreate(Consumer<? super FluxSink<T>> source,
			FluxSink.OverflowStrategy backpressure,
			CreateMode createMode) {
		this.source = Objects.requireNonNull(source, "source");
		this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
		this.createMode = createMode;
	}

一个Consumer函数会消费一个FluxSink,一个FluxSink.OverflowStrategy背压策略,还有一个创建模式。可以说都是我们的老朋友了,在前面都有介绍了。构造函数里也都是简单的赋值,看来黑魔法不在这。

FluxCreate订阅方法

FluxCreate实现了Publisher接口所以它也是一个生产者。

@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		BaseSink<T> sink = createSink(actual, backpressure);

		actual.onSubscribe(sink);
		try {
			source.accept(
					createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
							sink);
		}
		catch (Throwable ex) {
			Exceptions.throwIfFatal(ex);
			sink.error(Operators.onOperatorError(ex, actual.currentContext()));
		}
	}
  • 先创建了一个BaseSinkpushcreate都是使用的默认的BufferAsyncSink。这里就不放代码了,里面逻辑不复杂。
  • BaseSink当做一个Subscription传给Subscriber,作为FluxCreatepushcreate都是使用的默认的BufferAsyncSink。与Subscriber的桥梁。
  • 接着就是黑魔法啦,通过CreateMode模式的不同,来对createsink使用SerializedFluxSink多进行了一层封装。pushcreate的不同点就是这造成的。

总结

我们通过元素图与源码分析了Flux.create与Flux.push之间的区别。当然还只是停留在表面,内部的源码笔者通过下一篇博客体现。

希望有帮到你,感谢阅读。

Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐