目前卷文化盛行,为了增强面试能力,开始了无边无际的学习,无边界不是重点,重点是要深入1万米。言归正传,本文打算做一个多线程学习的系列文章,沉淀自我。


前言

本文主要是讲解atomic的概念,基本用法,使用场景,底层代码原理剖析。

一、atomic是什么?

在java.util.concurrent.atomic包下atomic一般指原子操作类,主要分为四种类型的原子更新类:原子更新基本类型、原子更新数组类型、原子更新引用和原子更新属性。

原子更新基本类型 说明
AtomicBoolean 原子更新布尔变量
AtomicInteger 原子更新整型变量
AtomicLong 原子更新长整型变量
原子更新数组 说明
AtomicIntegerArray 原子更新整型数组的某个元素
AtomicLongArray 原子更新长整型数组的某个元素
AtomicReferenceArray 原子更新引用类型数组的某个元素
原子更新引用类型 说明
AtomicReference 原子更新引用类型
AtomicReferenceFieldUpdater 原子更新引用类型里的字段
AtomicMarkableReference 原子更新带有标记位的引用类型
原子更新字段类 说明
AtomicIntegerFieldUpdater 原子更新整型字段
AtomicLongFieldUpdater 原子更新长整型字段
AtomicStampedReference 原子更新带有版本号的引用类型

二、基本应用场景

原子更新基本类型:使用原子方式更新基本类型;
原子更新数组:通过原子更新数组里的某个元素;
原子更新引用类型:需要更新引用类型往往涉及多个变量;
原子更新字段类:需要原子更新某个类的某个字段。

2.1原子更新基本类型

以AtomicLong为例,主要方法如下

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // setup to use Unsafe.compareAndSwapLong for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    /**
     * Records whether the underlying JVM supports lockless
     * compareAndSwap for longs. While the Unsafe.compareAndSwapLong
     * method works in either case, some constructions should be
     * handled at Java level to avoid locking user-visible locks.
     */
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

    /**
     * Returns whether underlying JVM supports lockless CompareAndSet
     * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
     */
    private static native boolean VMSupportsCS8();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile long value;

    /**
     * Creates a new AtomicLong with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicLong(long initialValue) {
        value = initialValue;
    }

    /**
     * Creates a new AtomicLong with initial value {@code 0}.
     */
    public AtomicLong() {
    }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final long get() {
        return value;
    }

    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(long newValue) {
        value = newValue;
    }

    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(long newValue) {
        unsafe.putOrderedLong(this, valueOffset, newValue);
    }

    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final long getAndSet(long newValue) {
        return unsafe.getAndSetLong(this, valueOffset, newValue);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p><a href="package-summary.html#weakCompareAndSet">May fail
     * spuriously and does not provide ordering guarantees</a>, so is
     * only rarely an appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful
     */
    public final boolean weakCompareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final long getAndIncrement() {
        return unsafe.getAndAddLong(this, valueOffset, 1L);
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the previous value
     */
    public final long getAndDecrement() {
        return unsafe.getAndAddLong(this, valueOffset, -1L);
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final long getAndAdd(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final long decrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final long addAndGet(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the previous value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the previous value
     * @since 1.8
     */
    public final long getAndUpdate(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the updated value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the updated value
     * @since 1.8
     */
    public final long updateAndGet(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the previous value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the previous value
     * @since 1.8
     */
    public final long getAndAccumulate(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the updated value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the updated value
     * @since 1.8
     */
    public final long accumulateAndGet(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

}

使用示例代码如下:

package com.valley.juc.atomic;

import java.util.concurrent.atomic.AtomicLong;

/**
 * @author valley
 * @date 2022/6/23
 * @Description AtomicLong
 */
public class AtomicLongDemo {
    static AtomicLong cnt = new AtomicLong(0l);

    static void m() {
        for (int i = 0; i < 10000; i++) {
            cnt.incrementAndGet();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[1000];

        for (int i = 0; i < threads.length; i++)
            threads[i] = new Thread(() -> {
                AtomicLongDemo.m();
            });

        long start = System.currentTimeMillis();

        for(int i=0;i<threads.length;i++){
            threads[i].start();
        }
        for(int i=0;i<threads.length;i++){
            threads[i].join();
        }

        System.out.println(System.currentTimeMillis()-start);
        System.out.println(cnt.get());

    }

}

2.2原子更新数组

以AtomicIntegerArray为例,主要方法如下

/**
     * Creates a new AtomicIntegerArray of the given length, with all
     * elements initially zero.
     *
     * @param length the length of the array
     */
    public AtomicIntegerArray(int length) {
        array = new int[length];
    }

    /**
     * Creates a new AtomicIntegerArray with the same length as, and
     * all elements copied from, the given array.
     *
     * @param array the array to copy elements from
     * @throws NullPointerException if array is null
     */
    public AtomicIntegerArray(int[] array) {
        // Visibility guaranteed by final field guarantees
        this.array = array.clone();
    }
    /**
     * Returns the length of the array.
     *
     * @return the length of the array
     */
    public final int length() {
        return array.length;
    }
    /**
     * Atomically increments by one the element at index {@code i}.
     *
     * @param i the index
     * @return the previous value
     */
    public final int getAndIncrement(int i) {
        return getAndAdd(i, 1);
    }

    /**
     * Atomically decrements by one the element at index {@code i}.
     *
     * @param i the index
     * @return the previous value
     */
    public final int getAndDecrement(int i) {
        return getAndAdd(i, -1);
    }

    /**
     * Atomically adds the given value to the element at index {@code i}.
     *
     * @param i the index
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int i, int delta) {
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

    /**
     * Atomically increments by one the element at index {@code i}.
     *
     * @param i the index
     * @return the updated value
     */
    public final int incrementAndGet(int i) {
        return getAndAdd(i, 1) + 1;
    }

    /**
     * Atomically decrements by one the element at index {@code i}.
     *
     * @param i the index
     * @return the updated value
     */
    public final int decrementAndGet(int i) {
        return getAndAdd(i, -1) - 1;
    }

    /**
     * Atomically adds the given value to the element at index {@code i}.
     *
     * @param i the index
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int i, int delta) {
        return getAndAdd(i, delta) + delta;
    }

使用示例代码如下:展示了数组每个元素都是安全性操作

package com.valley.juc.atomic;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * @author valley
 * @date 2022/6/24
 * @Description AtomicArray
 */
public class AtomicArrayDemo {
    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
        Thread[] threads1 = new Thread[100];
        Thread[] threads2 = new Thread[100];
        Decrement decrement = new Decrement(atomicIntegerArray);
        Increment increment = new Increment(atomicIntegerArray);
        for (int i = 0; i < 100; i++) {
            threads1[i] = new Thread(decrement);
            threads2[i] = new Thread(increment);

            threads1[i].start();
            threads2[i].start();
        }

        for (int i = 0; i < 100; i++) {
            threads1[i].join();
            threads2[i].join();
        }

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            if(atomicIntegerArray.get(i) != 0){
                System.out.println("发现了错误:错误的索引:"+i);
            }
        }
        System.out.println("运行结束");
    }

    static class Decrement implements Runnable{

        private AtomicIntegerArray array;

        public Decrement(AtomicIntegerArray array) {
            this.array = array;
        }

        @Override
        public void run() {
            for (int i = 0; i < array.length(); i++) {
                array.getAndDecrement(i);
            }
        }
    }

    static class Increment implements Runnable{

        private AtomicIntegerArray array;

        public Increment(AtomicIntegerArray array) {
            this.array = array;
        }

        @Override
        public void run() {
            for (int i = 0; i < array.length(); i++) {
                array.getAndIncrement(i);
            }
        }
    }
}


2.3原子更新引用类型

以AtomicReference为例,主要方法如下

   /**
     * Creates a new AtomicReference with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicReference(V initialValue) {
        value = initialValue;
    }

    /**
     * Creates a new AtomicReference with null initial value.
     */
    public AtomicReference() {
    }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final V get() {
        return value;
    }

    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(V newValue) {
        value = newValue;
    }

    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

使用示例代码如下:AtomicReference可以让一个对象保证原子性,当然,AtomicReference的功能明显比AtomicInteger强,因为一个对象里可以包含很多属性

package com.valley.juc.atomic;

import java.util.concurrent.atomic.AtomicReference;

/**
 * @author valley
 * @date 2022/7/1
 * @Description TODO
 */
public class AtomicReferenceDemo {


        private AtomicReference<Thread> sign = new AtomicReference<>();

        public void lock(){
            Thread current = Thread.currentThread();
            while(!sign.compareAndSet(null,current)){
                System.out.println(Thread.currentThread().getName() + "自旋锁获取失败,再次尝试");
            }
        }

        public void unlock(){
            Thread current = Thread.currentThread();
            sign.compareAndSet(current,null);
        }

        public static void main(String[] args) {
            AtomicReferenceDemo spinLock = new AtomicReferenceDemo();
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "尝试获取自旋锁");
                    spinLock.lock();
                    System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        spinLock.unlock();
                        System.out.println(Thread.currentThread().getName() + "释放了自旋锁");
                    }
                }
            };

            Thread thread1 = new Thread(runnable);
            Thread thread2 = new Thread(runnable);
            thread1.start();
            thread2.start();
        }


}

2.4原子更新引用类型

  • 需要更新某个类里的某个字段
  • Atomic包提供了以下三个类:

AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新长整型字段的更新器。
AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
原子更新字段类都是抽象类,每次使用都时候必须使用静态方法newUpdater创建一个更新器。原子更新类的字段的必须使用public volatile修饰符。

使用上述类是必须遵循以下原则:
字段必须是voliatile类型的,在线程之间共享变量时能保持可见性。
字段的描述类型是与调用者的操作对象字段保持一致。
也就是说调用者可以直接操作对象字段,那么就可以反射进行原子操作。
对父类的字段,子类不能直接操作的,尽管子类可以访问父类的字段
只能是实例变量,不能是类变量,也就是说不能加static关键字
只能是可修改变量,不能使用final修饰变量,final的语义,不可更改。

以AtomicIntegerFieldUpdater为例,主要方法如下

  @CallerSensitive
    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }

    /**
     * Protected do-nothing constructor for use by subclasses.
     */
    protected AtomicIntegerFieldUpdater() {
    }

    public int addAndGet(T obj, int delta) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev + delta;
        } while (!compareAndSet(obj, prev, next));
        return next;
    }


    public int getAndAdd(T obj, int delta) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev + delta;
        } while (!compareAndSet(obj, prev, next));
        return prev;
    }

    public int decrementAndGet(T obj) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev - 1;
        } while (!compareAndSet(obj, prev, next));
        return next;
    }
    public int getAndDecrement(T obj) {
        int prev, next;
        do {
            prev = get(obj);
            next = prev - 1;
        } while (!compareAndSet(obj, prev, next));
        return prev;
    }

使用示例代码如下:

package com.valley.juc.atomic;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * @author valley
 * @date 2022/7/1
 * @Description AtomicIntegerFieldUpdater
 */
public class AtomicIntegerFieldUpdaterDemo implements Runnable{


        static Candidate tom;
        static Candidate peter;

        public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
                AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                peter.score++;
                scoreUpdater.getAndIncrement(tom);
            }
        }

        public static void main(String[] args) throws InterruptedException {
            tom = new Candidate();
            peter = new Candidate();
            AtomicIntegerFieldUpdaterDemo updaterDemo = new AtomicIntegerFieldUpdaterDemo();
            Thread thread1 = new Thread(updaterDemo);
            Thread thread2 = new Thread(updaterDemo);
            thread1.start();
            thread2.start();
            thread1.join();
            thread2.join();
            System.out.println("普通变量自增:"+peter.score);
            System.out.println("AtomicIntegerFieldUpdater操作的变量:"+tom.score);
        }

        public static class Candidate{
            volatile int score;
        }

}

三、和高性能原子类比较

  • 高性能原子类,是java8中增加的原子类,它们使用分段的思想,把不同的线程hash到不同的段上去更新,最后再把这些段的值相加得到最终的值,这些类主要有:
  • Striped64: LongAccumulator,LongAdder,DoubleAccumulator,DoubleAdder的父类。
  • LongAccumulator:long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
  • LongAdder:long类型的累加器,LongAccumulator的特例,只能用来计算加法,且从0开始计算。
  • DoubleAccumulator:double类型的聚合器,需要传入一个double类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
  • DoubleAdder:double类型的累加器,DoubleAccumulator的特例,只能用来计算加法,且从0开始计算。
  • 这几个类操作基本类似,其中DoubleAccumulator和DoubleAdder底层其实也是用long来实现的。Accumulator和Adder非常相似,Accumulator就是一个更通用版本的Adder。LongAddr与LongAccumulator类都是使用非阻塞算法CAS实现的,这相比于使用锁实现原子性操作在性能上有很大的提高。LongAddr类是LongAccumulator类的一个特例,只是LongAccumulator提供了更强大的功能,可以让用户自定义计算规则。
  • 以高并发场景下LongAdder比AtomicLong性能好举例子

高并发下LongAdder比AtomicLong效率高,本质是空间换时间
在竞争激烈的时候,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的理念,提高了并发性
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。由于竞争很激烈,每一次操作,都要flush和refresh,导致很多资源浪费
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

点击阅读全文
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐