【Java并发编程】Atomic原子类及CAS相关源码分析

苡仁 2020年04月12日 46次浏览

1. J.U.C下的原子类

1.1 原子类介绍

  • 原子类,对象的数据操作不可分割的。具有原子性
  • 作用:原子类的作用和锁类似,是为了保证并发情况下线程的安全问题,不过原子类相比于锁,有一定优势:
    1. 粒度更细:原子类把竞争范围缩小到了变量级别
    2. 效率较高:通常情况下更高,但是高度竞争的情况下效率更低
  • J.U.C下的原子类,大都由CAS实现(最后会有源码分析)

1.2 Atomic*基本类型原子类

  • 以AtomicInteger为例
  1. 常用方法
    • int get() 获取当前值
    • int getAndSet(int) 获取当前值并设置新的值
    • int getAndIncrement() 获取当前值并自增
    • int incrementAndGet() 获取自增后的值 (同一个方法相比,get在前就获取自增前的值,get在后就获取自增后的值)
    • int getAndDecrement() 获取当前值并自减
    • int getAndAdd(int) 获取当前值并加上一个值
    • boolean compareAndSet(int expect, int update) 判断当前值是否符合预期值expect,如果符合就设置更新值update
  2. 使用示例
/**
 * 使用AtomicInteger 对比 非原子类,演示线程安全问题
 *
 * @author yiren
 */
public class AtomicIntegerExample01 {
    private static AtomicInteger atomicInteger = new AtomicInteger();
    private static volatile Integer count = 0;

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i < 10000; i++) {
                atomicInteger.getAndIncrement();
                count++;
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("atomicInteger=" + atomicInteger);
        System.out.println("count=" + count);
    }
}
atomicInteger=20000
count=13409

Process finished with exit code 0
  • 我们可以看到此时AtomicInteger结果是正确的

  • Integer的结果则不正确,如果要保证线程安全,则需要加锁

  • 只要在线程中不多次调用,都可以保证线程安全,单个方法原子类都是保证线程安全的

  • 注意:原子操作+原子操作!= 原子操作

1.3 Atomic*Array数组类型分析

  • AtomicIntegerArray为例

  • 数组类型的时候,它会保证每个元素的的操作都是线程安全的

  • AtomicIntegerArray的方法和AtomicInteger的方法都类似,不过AtomicIntegerArray的方法需要指定数组的index

  • 代码演示:

/**
 * @author yiren
 */
public class AtomicIntegerArrayExample {
    private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

    public static void main(String[] args) throws InterruptedException {
        Runnable incrRunnable = () -> {
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                atomicIntegerArray.incrementAndGet(i);
            }
        };
        Runnable decrRunnable = () -> {
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                atomicIntegerArray.decrementAndGet(i);
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(incrRunnable);
        }
        for (int i = 0; i < 1000; i++) {
            executorService.execute(decrRunnable);
        }
        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.print(atomicIntegerArray.get(i) + " ");
        }
    }
}
0 0 0 0 0 0 0 0 0 0

1.4 AtomicReference应用类型分析

  • AtomicReference类的作用,和AtomicInteger并没有太多区别,只是作用对象变了,AtomicInteger是保证一个整数的原子性,而AtomicReference是让一个对象保证原子性
  • AtomicReference会比AtomicInteger更强大,因为对象中会包含很多属性,用法是类似的
  • 类对象的方法

  • 案例
/**
 * @author yiren
 */
public class SpinLock {
    private static AtomicReference<Thread> sign = new AtomicReference<>();

    private static void lock() {
        Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {
            System.out.println("fail to set!");
        }
    }

    private static void unlock() {
        Thread thread = Thread.currentThread();
        sign.compareAndSet(thread, null);
    }

    public static void main(String[] args) {
        Runnable runnable = () -> {
            System.out.println("start to get lock");
            SpinLock.lock();
            System.out.println("got lock successfully!");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                SpinLock.unlock();
            }
        };

        Thread thread = new Thread(runnable);
        Thread thread1 = new Thread(runnable);

        thread.start();
        thread1.start();
    }
}
  • 我们利用AtomicReference来实现一个自旋锁,通过compareAndSet方法去先比较然后赋值来避免使用锁

1.5 封装普通类型成原子类

  • AtomicIntegerFieldUpdater为例

  • AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");我们创建对象时候,需要指定目标类,以及属性。

  • 并且在操作的时候,需要传入操作的对象

  • 为什么会需要用这样的方式呢?而不直接在原有对象上修改?

    • 如果我们在编码中,仅有极少时候才会用到原子性操作,如果在原有对象直接使用原子类就十分浪费性能了
    • 此外,我们在使用别人定义的类的时候,有这样的需求,但是别人没有这样的需求,我们也不能破坏人家的定义,这个时候AtomicIntegerFieldUpdater的使用,就不会对原有类进行侵入式破坏了。
  • 注意:这个类不支持static修饰的变量

  • 案例如下:

/**
 * @author yiren
 */
public class AtomicFieldUpdaterExample {
    private static Counter one = new Counter();
    private static Counter two = new Counter();
    private static AtomicIntegerFieldUpdater<Counter> updater = AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");


    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i < 10000; i++) {
                one.count++;
                updater.getAndIncrement(two);
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("one.count = " + one.count);
        System.out.println("two.count = " + two.count);
    }

    private static class Counter {
        volatile int count;
    }
}
one.count = 18417
two.count = 20000

Process finished with exit code 0
  • 可以看出,升级过后,原有的数据操作线程安全了

1.6 Adder累加器

  • 以LongAdder为例

  • LongAdder是Java8引入的新类,高并发下LongAdderAtomicLong的效率高,其本事是利用了空间换时间

  • 它其实是利用了分段锁技术,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,提高了并发性能。

  1. 代码演示:对比AtomicLongLongAdder
/**
 * @author yiren
 */
public class AtomicLongExample {
    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong();
        ExecutorService executorService = Executors.newFixedThreadPool(16);
        Runnable task = () -> {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        };
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            executorService.execute(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
        }
        long end = System.currentTimeMillis();
        System.out.println("end-start=" + (end - start)+ "ms");

    }
}
end-start=2140ms

Process finished with exit code 0
/**
 * @author yiren
 */
public class LongAdderExample {
    public static void main(String[] args) {
        LongAdder counter = new LongAdder();
        ExecutorService executorService = Executors.newFixedThreadPool(16);
        Runnable task = () -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        };
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            executorService.execute(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
        }
        long end = System.currentTimeMillis();
        System.out.println("end-start=" + (end - start)+ "ms");

    }
}
end-start=157ms

Process finished with exit code 0
  • 我们从上面可以看出,我本地机器是i7处理器,两个程序唯一差别就是使用的原子类相差了10多倍。LongAdder明显比AtomicLong更快
  1. 为什么差距这么大?
  • AtomicLong的操作,首先每个线程操作完后,会需要把线程本地内存的数据刷到主内存,然后另外一个线程还得从主内存中刷新新的数据。
  • LongAdder不需要这样做,LongAdder在每个线程都会有自己的一个计数器,仅仅用来在自己的线程内计数,这样一来就不会和其他线程的计数器干扰。
  • LongAdder引入的是分段累加的概念,内部有一个base变量和一个Cell[] cells数组共同参与计算
    • base:竞争不激烈就直接累加到该变量
    • cells: 竞争激烈的时候,各个线程就分散累加到自己的cells[i]
  1. 适用场景
  • AtomicLong:在竞争低的情况加和LongAdder相似,但是它具有CAS方法,可以提供更多的功能
  • LongAdder:在并发高的情况下有明显优势,但是只适用于统计求和计数的场景,有一定的局限性

1.7 Accumulator累加器

  • LongAccumulator为例

  • 基本用法

/**
 * @author yiren
 */
public class AccumulatorExample {
    public static void main(String[] args) {
        // 累加 :此处的(left, right) -> left + right 可以替换成 Long::sum
        // left=3
        LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left + right, 3);
        // left=3+right=3+2=5
        longAccumulator.accumulate(2);
        // left=5+right=5+3=8
        longAccumulator.accumulate(3);
        System.out.println(longAccumulator.getThenReset());
        // left=3
        LongAccumulator longAccumulator1 = new LongAccumulator((left, right) -> left - right, 3);
        // left=3-right=3-2=1
        longAccumulator1.accumulate(2);
        // left=1-right=1-3=-2
        longAccumulator1.accumulate(3);
        System.out.println(longAccumulator1.getThenReset());
        // 求最大值
        LongAccumulator longAccumulator2 = new LongAccumulator(Math::max, -1);
        longAccumulator2.accumulate(14);
        longAccumulator2.accumulate(3);
        System.out.println(longAccumulator2.getThenReset());

    }
}
8
-2
14

Process finished with exit code 0
  • 详细过程可以看注释
  • 有人或许会觉得这个麻烦。这个只是单线程,原子类是保证多线程操作的。也就是说我们可以在不同的线程直接调用
/**
 * @author yiren
 */
public class AccumulatorExample01 {
    public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((left, right) -> {
            long y = left;
            long x = right;
            return x + y;
        }, 0);

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        IntStream.range(1, 100).forEach(item -> executorService.execute(() -> accumulator.accumulate(item)));
        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }
        System.out.println(accumulator.get());
    }
}
4950

Process finished with exit code 0
  • 使用场景:
    • 需要并行计算,数据量大的
    • 没有顺序要求的

2. CAS原理

2.1 CAS是什么?

  • CAS,全称是compare and swap

  • CAS有三个值。内存值Value、预期值Expect、要修改的值Target,当且仅当Expect==Value时,才能将内存值改为Target,否则什么都不做。最后返回当前的Value

  • CAS在现代处理器中,是有特殊指令可以实现的,而JVM在实现的也会利用汇编指令: cmpxchg

2.2 案例演示

  • CAS的等价代码:
/**
 * @author yiren
 */
public class CasExample {
    private static volatile int value;

    public static synchronized int compareAndSwap(int expect, int target) {
        int oldValue = value;
        if (expect == oldValue) {
            value = target;
        }
        return value;
    }

    public static void main(String[] args) throws InterruptedException {
        value = 0;
        Runnable runnable = () -> {
            compareAndSwap(0, 1);
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();

        System.out.println(value);
    }
}

2.3 源码分析CAS

  • 以原子类AtomicInteger的源码为案例

  • 以下是AtomicInteger中的属性定义:

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	
    private volatile int value;
  • 其中value是我们主要保存数据的属性;而valueOffset则表示变量value值在内存地址中当前对象的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了

  • 我们看一下AtomicInteger的具体的操作方法

   public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
  • 可以看出里面主要相关的方法就是unsafecompareAndSwapIntgetAndAddInt且每个方法调用都传入了当前对象、value的偏移地址、和操作数

  • Unsafe类:它是CAS实现的核心类,Java无法直接访问底层操作系统,而是通过本地native方法来访问,不过JVM还是提供了一个途径,JDK中的Unsafe类,它就提供了硬件级别的原子操作.

  • 我们看下int getAndAddInt(Object var1, long var2, int var4)方法

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
  • var1是当前AtomicInteger的对象,var2是value的偏移地址,通过偏移地址用UnsafegetIntVolatile获取到当前AtomicInteger对象的value值,然后调用UnsafecompareAndSwapInt方法来做CAS。
  • 看下compareAndSwapInt的定义
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  • 它是一个native方法,实际调用的JVM的C实现的方法,而在C代码中又调用的是Atomic::cmpxchg,而在现代处理器中,实际是可以对应汇编指令集中的 比较并交换指令CMPXCHG

2.5 CAS的缺点

  1. ABA问题

比如

  • 初始值为0,线程1将它改成1,然后又将它改回0
  • 线程2在线程1修改成1之前拿到了0,然后又在线程1改回0后去比较。
  • 此时线程2就会修改成功,但是线程2并不知道线程1对里面的数进行过修改
  • 对于此,可以使用版本号来解决 比如1A->2B->3A-4B这样,每个操作都有一个版本号作为记录。在比较的时候就是用版本号去比较
  • Java中有一个AtomicStampedReference可用于解决ABA问题
  1. 并发度高的情况下,效率很低,可能需要比较很多次

  • 文章内容来源:
  • 《Java并发编程艺术》、JDK1.8版本源码、慕课网悟空JUC课程