系列文章

Reactor3 SpscLinkedArrayQueue源码分析🔥
Reactor3 MpscLinkedQueue源码分析🔥🔥
Reactor3 Flux.create与Flux.push正确打开方式🔥🔥🔥
Reactor3 Flux.create与Flux.push区别与源码分析(一)🔥🔥🔥🔥🔥
Reactor3 Flux.create与Flux.push区别与源码分析(二)🔥

版本

Reactor 3.4.9

前言

通过前面的几篇文章,我们已经将Flux的create方法与push方法不管是从使用还是原理都了解的很清楚了。但是前面我们都是停留在去熟悉这个框架它的实现上面,没有更加深层次的思考为什么这样做,以及我能从这个Reactor3框架中学习到哪些东西能应用到平时的开发过程中。本篇文章将会介绍Flux的create方法与push方法是如何去应用一些JUC的工具类去实现在高并发下技能保证性能也能保证线程安全的技巧。

约定优于限制

在Spring Boot中有一句话叫约定优于配置。我这里借鉴过来约定优于限制。在Flux.push中只是在文档中说明它是一个单线程提交任务的生产者。可是在Flux.push的实现中并没有去限制只能有一个线程来使用它的sink来下发元素。当然在多线程情况下,我们在上一篇也举例说明了,确实会有安全问题。所以这个就需要使用者注意了,它并没有去限制什么,而是在文档中说明了一下,和你就形成了一种约定。你不能将它应用在多线程的情况,如果你用了我不负责,这样在一定程度上减少了一些逻辑判断与不必要的性能消耗。

使用AtomicXXXFieldUpdater来操作Volatile变量

在之前的源码分析中,AtomicXXXFieldUpdater出境次数还是很多的,比如wip等等。不止在Reactor3中,在其他框架(比如Netty等)也有高频率的使用它。
首先要明白它的作用:保证变量的原子操作的安全性。已经有了原子类了(比如:Integer对应的原子类AtomicInteger),为什么要AtomicXXXFieldUpdater?

使用AtomicXXXFieldUpdater的好处
  • **使用AtomicXXXFieldUpdater实现的原子操作比直接使用原子类开销要小。**因为AtomicXXXFieldUpdate是一个全局的静态变量,伴随一个对应的volatile修饰的非静态成员变量。也就是每个实例共享一个AtomicXXXFieldUpdater,每个实例只用维护自己的volatile修饰的变量就可以了。如果使用原子类则每个实例都各自维护各自的原子变量,而原子类中各自除了维护一个volatile变量外还需要原子类其他的信息。
  • 不会破坏共享变量原来的结构,更灵活。如果使用原子类则在使用这个变量的时候还需要get等一些方法,但是如果使用AtomicXXXFieldUpdater则可以直接使用对应的成员变量。

JDK9 操作Volatile变量更进一步

在JDK8以及之前笔者推荐你使用AtomicXXXFieldUpdater来操作Volatile,但是由于它是通过反射来操作变量,同样它也是使用Unsafe来操作变量。因此它有两个问题:性能与安全性。Unsafe也是被JDK计划移除的类,考虑到向后方便升级的问题最好不要使用它。被移除就一定会有替代品,那就是VarHandle。VarHandle在JDK9+中已经广泛被使用,尤其是JUC中的各个类的Unsafe都被替换为了VarHandle。
下面是使用VarHandle的set与AtomicXXXFieldUpdater的set性能测试(使用JMH标准测试)。VarHandler性能好于AtomicXXXFieldUpdater不少。
在这里插入图片描述

在应用层优化掉不必要的内存屏障

这个就涉及到JMM的一些知识了,为了不影响后面内容的理解,如果不了解的可以先了解一下再继续看下面的内容。JMM为了保证内存可见性,Java编译器会在生成的指令一些必要的地方插入一些内存屏障来进制特定类型的处理器重排序。
在Flux.push与Flux.create的实现中有很多用到volatile的地方,对于volatile的写JMM会在其前后分别加上StoreStore屏障与SotreLoad屏障,来保证其对任意处理器的可见性。
在MpscLinkedQueue中有个soNext方法,它使用lazySet去设置变量。

reactor.util.concurrent.MpscLinkedQueue.LinkedQueueNode#soNext

public void soNext(@Nullable LinkedQueueNode<E> n)
		{
			NEXT_UPDATER.lazySet(this, n);
		}

操作的变量是由volatile,但是使用lazySet方法并不会再写后加StoreLoad,只有前面的StoreStore,也就是说它不保证对变量的操作能马上对其他线程可见。在某些场景这个方法会很有用处。下面就拿MpscLinkedQueue来举例说明使用场景。

再探MpscLinkedQueueoffer方法

public final boolean offer(final E e) {
        Objects.requireNonNull(e, "The offered value 'e' must be non-null");

        //创建新的节点
        final LinkedQueueNode<E> nextNode = new LinkedQueueNode<>(e);
        //这里使用CAS来控制并发,每次只有一个线程能抢占到节点
        final LinkedQueueNode<E> prevProducerNode = PRODUCER_NODE_UPDATER.getAndSet(this, nextNode);
        // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
        // and completes the store in prev.next.
        //抢占成功后就可以偷懒使用lazySet慢悠悠的将值存放到这个节点,其他线程不会立马看到这个新的节点,
        //不过不会对结果有影响,订阅者会循环的拉取消息,过很小的一段时间就可以看到这个节点了
        prevProducerNode.soNext(nextNode); // StoreStore
        return true;
}
  • 我们要先有一个意识:最后它是用lazySet去设置nextNode,这样在并发下为什么没有安全性问题?
  • 第一步创建待插入的新节点
  • 接着使用CAS将这个新节点替换成新的生产节点,每次只有一个线程能抢占到节点,这个时候已经确保了线程的安全性。
  • 抢占成功后就可以偷懒使用lazySet慢悠悠的将值存放到这个节点,其他线程不会立马看到这个新的节点,不过不会对结果有影响,订阅者会循环的拉取消息,过很小的一段时间就可以看到这个节点了。
lazySet与set性能对比(JMH测试)

通过下图我们看到两者的差距有快6倍之多,所以在一些场景可以使用lazySet替换set。
在这里插入图片描述

参考资料

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?
Java编程方法论 知秋

Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐