Java并发编程——原子操作类原理解析

PunkLu 2020年01月12日 216次浏览
AtomicLong原子类原理解析

原子变量操作类

JUC包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS实现的,相比使用锁实现原子性操作在性能上有了很大提高。

原子操作类AtomicLong使用

public class AtomicDemo {

    // 10 创建Long型原子计数器
    private static AtomicLong atomicLong = new AtomicLong();
    // 11 创建数据源
    private static Integer[] arrayOne = new Integer[]{0,1,2,3,0,5,6,0,56,0};
    private static Integer[] arrayTwo = new Integer[]{10,1,2,3,0,5,6,0,56,0};

    public static void main(String[] args) throws InterruptedException{
        // 12 线程One统计数组arrayOne中0的个数
        Thread threadOne = new Thread(new Runnable() {
            public void run() {
                int size = arrayOne.length;
                for (int i = 0;i<size;++i){
                    if (arrayOne[i].intValue() == 0){
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });

        // 13 线程two统计数组arrayTwo中0的个数
        Thread threadTwo = new Thread(new Runnable() {
            public void run() {
                int size = arrayTwo.length;
                for (int i = 0;i<size;++i){
                    if (arrayTwo[i].intValue() == 0){
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });

        // 14 启动子线程
        threadOne.start();
        threadTwo.start();

        // 15 等待线程执行完毕
        threadOne.join();
        threadTwo.join();

        System.out.println("count 0:" + atomicLong.get());
    }
}

运行结果:

count 0:7

如上代码中的两个线程各自统计自己所持数据中0的个数,每当找到一个0就会调用AtomicLong的原子性递增方法。

原子变量操作类AtomicLong原理分析

AtomicLong是原子性递增或者递减类,其内部使用Unsafe来实现。

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // 1、获取Unsafe实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    // 2、存放变量value的偏移量
    private static final long valueOffset;

   	// 3、判断JVM是否支持Long类型无锁CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();

    static {
        try {
        	// 4、获取value在AtomicLong中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

	// 5、实际变量值
    private volatile long value;


    public AtomicLong(long initialValue) {
        value = initialValue;
    }
}

代码1通过Unsafe.getUnsafe()方法获取到Unsafe类的实例,之所以能通过Unsafe.getUnsafe()方法获取到Unsafe类的实例,是因为AtomicLong类也是在rt.jar包下的,AtomicLong类就是通过Bootstarp类加载器进行加载的。

代码5中的value被声明为volatile的,是为了在多线程下保证内存可见性,value是具体存放计数的变量。

代码2、4获取value变量在AtomicLong类中的偏移量。

AtomicLong中的主要函数:

1、递增和递减操作代码

// 6、调用unsafe方法,原子性设置value值为原始值+1,返回值为递增后的值
public final long incrementAndGet(){
	return unsafe.getAndAddLong(this,valueOffset,1L) + 1L;
}

// 7、调用unsafe方法,原子性设置value值为原始值-1,返回值为递减之后的值
public final long decrementAndGet(){
	return unsafe.getAndAddLong(this,valueOffset,-1L) - 1L;
}

// 8、调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement(){
	return unsafe.getAndAddLong(this,valueOffset,1L);
}

// 9、调用unsafe方法,原子性设置value值为原始值-1,返回值为原始值
public final long getAndDecrement(){
	return unsafe.getAndAddLong(this,valueOffset,-1L);
}

在如上代码内部是通过调用Unsafe的getAndAddLong方法来实现操作,这个函数是个原子性操作,这里第一个参数是AtomicLong实例的引用,第二个参数是value变量在AtomicLong中的偏移值,第三个参数是要设置的第二个变量的值。

2、boolean compareAndSet(long expect,long update)方法

public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

由如上代码可知,在内部还是调用了unsafe.compareAndSwapLong方法。如果原子变量中的value值等于expect,则使用update值更新该值并返回true,否则返回false。

原子操作类LongAdder

AtomicLong通过CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经很好了,但是使用AtomicLong时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS的操作,这会白白浪费CPU资源。

因此JDK8新增了一个原子性递增或者递减类LongAdder用来克服AtomicLong的缺点。AtomicLong的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,把一个变量分解为多个变量,让同样多的线程去竞争多个资源,就解决了性能问题。

LongAdder是在内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相的减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell原子变量时如果失败了,它并不是在当前Cell变量上一直自旋CAS重试,而是尝试在其他Cell的变量上进行CAS尝试,这个改变增加了成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base返回的。

LongAdder代码分析

LongAdder类继承自Striped64类,在Striped64内部维护着三个变量(base、cellsBusy、cells数组)。LongAdder的真实值是base的值与Cell数组里所有Cell元素中的value值的累加,base是个基础值,默认为0。cellsBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。

Cell的构造:

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
}

可以看到,Cell的构造很简单,其内部维护一个被声明为volatile的变量,声明为volatile是因为线程操作value变量时没有使用锁,为了保证变量的内存可见性将其声明为volatile的。cas函数通过CAS操作,保证了当前线程更新时被分配的Cell元素中value值的原子性。另外,Cell类使用@sun.misc.Contended修饰是为了避免伪共享。

long sum()方法:

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
}

long sum()返回当前的值,内部操作是累加所有Cell内部的value值后再累加base。由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值。

void reset()方法:

public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
}

把base置为0,如果Cell数组有元素,则元素值被重置为0。

add方法:

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {  // 1 
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||  // 2
                (a = as[getProbe() & m]) == null ||       // 3
                !(uncontended = a.cas(v = a.value, v + x)))  // 4
                longAccumulate(x, null, uncontended);  // 5
        }
}

如果cells不为null或者线程执行代码1的CAS操作失败了,则会去执行代码2。代码2、3决定当前线程应该访问cells数组里面的哪一个Cell元素,如果当前线程映射的元素存在则执行代码4,使用CAS操作去更新分配的Cell元素的value值,如果当前线程映射的元素不存在或者存在但是CAS操作失败则执行代码5。将代码2、3、4合起来看就是获取当前线程应该访问的cells数组的Cell元素,然后进行CAS更新操作,只是在获取期间如果有些条件不满足则会执行代码5。另外线程应该访问cells数组的哪一个Cell元素是通过getProbe()&m进行计算的,m是cells数组元素个数-1,getProbe()则用于获取当前线程中变量threadLocalRandomProbe的值,这个值一开始为0,在代码5里面会对其进行初始化。并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素的value值更新的原子性。

void longAccumulate(long x,LongBinaryOperator fn,boolean wasUncontended)方法:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // 6、初始化当前线程的变量threadLocalRandomProbe的值
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // 7
                if ((a = as[(n - 1) & h]) == null) {           // 8
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 9、当前Cell存在,则执行CAS设置
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // 10、当前Cell数组元素个数大于CPU个数
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // 11、是否有冲突
                else if (!collide)
                    collide = true;
               	// 12、如果当前元素个数没有达到CPU个数并且有冲突则扩容。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                        	// 12.1
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                    	// 12.2
                        cellsBusy = 0;
                    }
                    // 12.3
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 13、为了能够找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数
                h = advanceProbe(h);
            }
            // 14、初始化Cell数组
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                    	// 14.1
                        Cell[] rs = new Cell[2];
                        // 14.2
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                	// 14.3
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
}

当每个线程第一次执行到代码6时,会初始化当前线程变量threadLocalRandomProbe的值,这个变量在计算当前线程应该被分配到cells数组的哪一个Cell元素时会用到。

cells数组的初始化是在代码14中进行的,其中cellsBusy是一个标示,为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的Cell元素、通过CAS操作来进行0或1状态的切换,这里使用casCellsBusy函数。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码14.1初始化cells数组元素个数为2,然后使用h&1计算当前线程应该访问cell数组的哪个位置,也就是使用当前线程的threadLocalRandomProbe变量值&(cells数组元素个数-1),然后标示cells数组已经被初始化,最后代码14.3重置了cellsBusy标记。这里没有使用CAS操作,却是线程安全的,原因是原因是cellsBusy是volatile类型的,这保证了变量的内存可见性。另外此时其他地方的代码没有机会修改cellsBusy的值。在这里初始化的cells数组里面的两个元素的值目前还是null。

cell数组的扩容是在代码12中进行的,对cells扩容是有条件的,也就是代码10、11的条件都不满足的时候。具体就是当前cells的元素个数小于当前机器CPU个数并且当前多个线程访问了cells中同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。涉及CPU个数是因为只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组个数与CPU个数一致时,每个Cell都使用一个CPU处理时性能最佳。代码12中的扩容操作也是先通过CAS设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码12.1将容量扩充为之前的2倍,并复制Cell元素到扩容后数组。另外,扩容后cells数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是null。

在代码7、8中,当前线程调用add方法并根据当前线程的随机数threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null,则新增一个Cell元素到cells数组,并且在将其添加到cells数组之前要竞争设置cellsBusy为1。

代码13对CAS失败的线程重新计算当前线程的随机值threadLocalRandomProbe,以减少下次访问cells元素时的冲突机会。