Java并发编程——抽象同步队列AQS

PunkLu 2020年01月14日 195次浏览
抽象同步队列AQS

抽象同步队列AQS

AQS——锁的底层支持

AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。

AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。其中Node中的thread变量用来存放进入AQS队列里面的线程;Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标志线程是获取独占资源时被挂起后放入AQS队列的;waitStatus记录当前线程等待状态,可以为CANCELED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点);prev记录当前节点的前驱节点,next记录当前节点的后继节点。

在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于Semaphore来说,state用来表示当前可用信号的个数;对于CountDownLatch来说,state用来表示计数器当前的值。

AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程。这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。

在共享方式下获取和释放资源的方法为:void acquireShared(int arg) void acquireSharedInterruptibly(int arg) boolean releaseShared(int arg)。

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore信号量,当一个线程线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

独占方式下,获取与释放资源的流程如下:

1、当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)挂起自己。

public final void acquire(int arg){
	if(!tryAcquire(arg) && 
		acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
		selfInterrupt();
}

2、当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

需要注意的是,AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。子类在实现tryAcquire和tryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。子类还需要定义,在调用acquire和release方法时state状态值的增减代表什么含义。

比如继承自AQS实现的独占锁ReentrantLock,定义当status为0时表示锁空闲,为1时表示锁已经被占用。在重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false。相应的,继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前state的值从1修改为0,并设置当前锁的持有者为null,然后返回true,如果CAS失败则返回false。

共享方式下,获取与释放资源的流程如下:

1、当线程调用acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。

public final void acquireShared(int arg){
	if(tryAcquiredShared(arg) < 0 ){
		doAcquireShared(arg);
	}
}

2、当一个线程调用releaseShared(int arg)时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
}

AQS类并没有提供可用的tryAcquireShared和tryReleaseShared方法,需要由具体的子类来实现。子类在实现tryAcquireShared和tryReleaseShared要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。

比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryAcquireShared时,首先查看写锁是否被其他线程持有,如果是则直接返回false,否则使用CAS递增state的高16位(高16位为获取读锁的次数)。而读锁在重写tryReleaseShared时,在内部需要使用CAS算法把当前state值的高16位减1,然后返回true,如果CAS失败则返回false。

基于AQS实现的锁除了还需要重写isHeldExclusively方法,来判断锁是被当前线程独占还是被共享。

独占方式下的void acquire(int arg)和 void acquireInterruptibly(int arg)与共享方式下的void acquireShared(int arg)和void acquireSharedInterruptibly(int arg),这两套函数中都有一个带有Interruptibly关键字的函数,不带Interruptibly关键字的方法的意思是不对中断进行响应,也就是忽略中断。而带Interruptibly关键字的方法要对中断进行响应,其他线程中断了该线程,该线程会抛出InterruptedException异常而返回。

AQS提供的队列的入队操作:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;  // 1
            if (t == null) { // Must initialize 
                if (compareAndSetHead(new Node()))   // 2
                    tail = head;
            } else {
                node.prev = t;  // 3
                if (compareAndSetTail(t, node)) { // 4
                    t.next = node;
                    return t;
                }
            }
        }
}

如上代码在第一次循环中,当要在AQS队列尾部插入元素时,队列头、尾节点都指向null,当执行代码1后节点t指向了尾部节点。这时候t为null,故执行代码2,使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点。到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码1,因为这时候t!=null,所有执行到代码3,设置node的前驱节点为尾部节点,然后通过CAS算法设置node节点为尾部节点,CAS成功后在设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入。

AQS——条件变量的支持

如同notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。

在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。

条件变量的例子:

// 代码1
ReentrantLock lock = new ReentrantLock();
// 代码2
Condition condition = lock.newCondition();

// 代码3
lock.lock();
try{
    System.out.println("begin wait");
    // 代码4
    condition.await();
    System.out.println("end wait");
}catch(Exception e){
    e.printStackTrace();
}finally{
    // 代码5
    lock.unlock():
}

// 代码6
lock.lock();
try{
    System.out.println("begin signal");
    // 代码7
    condition.signal();
    System.out.println("end signal");
}catch(Exception e){
    e.printStackTrace();
}finally{

    // 代码8
    lock.unlock();
}

代码1创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。

代码2使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。

代码3首先获取了独占锁,代码4则调用了条件变量的await()方法阻塞挂起了当前线程。当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出IllegalMonitorStateException异常。

代码5则释放了获取的锁。

其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入synchronized块(获取了共享变量的内置锁),调用lock.unlock()方法就相当于退出synchronized块。调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。

在上面代码中,lock.newCondition()的作用是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量和state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。这个条件队列和AQS队列不是一回事。

在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的内部节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。

public final void await() throws InterruptedException{
        if(Thread.interrupted){
            throw new InterruptedException();
        }
        // 9、创建新的node节点,并插入到条件队列末尾
        Node node = addConditionWaiter();
        // 10、释放当前线程获取的锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 11、调用park方法阻塞挂起当前线程
        while(!isOnSyncQueue(node)){
            LockSupport.part(this);
            if((interruptMode = checkInterruptWhileWaiting(node)) !=0){
                break;
            }
        }
}

在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把这个条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                // 将条件队列头元素移动到AQS队列
                doSignal(first);
}

需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newCondition函数。

下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列:

private Node addConditionWaiter(){
    Node t = lastWaiter;
    
    // 代码1
    Node node = new Node(Thread.currentThread(),Node.CONDITION);
    // 代码2
    if(t == null){
        firstWaiter = node;
    }else{
        // 代码3
        t.nextWaiter = node;
    }
    // 代码4
    lastWaiter = node;
    return node;
}

代码1首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码2、3、4在单向条件队列尾部插入一个元素。

当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。

如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转为Node节点插入到条件变量对应的条件队列里面。

这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。

当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS阻塞队列里面,等待时机获取锁。

一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。

基于AQS实现自定义同步器

基于AQS实现一个不可重入的独占锁,自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里定义state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,自定义的锁支持条件变量。

1、代码实现

如下代码是基于AQS实现的不可重入的独占锁

class NonReentrantLock implements Lock,java.io.Serializable{
            
            // 内部帮助类
            private static class Sync extends AbstractQueuedSynchronizer{
                // 是否锁已经被持有
                protected boolean isHeldExclusively(){
                    return getState() == 1;
                }
                
                // 如果state为0 则尝试获取锁
                public boolean tryAcquire(int acquire){
                    assert acquires == 1;
                    if(compareAndSetState(0,1)){
                        setExclusiveOwnerThread(Thread.currentThread());
                        return true;
                    }
                    return false;
                }
                
                // 尝试释放锁,设置state为0
                protected boolean tryRelease(int release){
                    assert releases ==1;
                    if(getState() == 0){
                        throw new IllegalMonitorStateException();
                    } 
                    setExclusiveOwnerThread(null);
                    setState(0);
                    return true;
                }
                
                // 提供条件变量接口
                Condition newCondition(){
                    return new ConditionObject();
                } 
            }
            
            // 创建一个Sync来做具体的工作
            private final Sync sync = new Sync();
            
            public void lock(){
                sync.acquire(1);
            }
            
            public boolean tryLock(){
                return sync.tryAcquire(1);
            }
            
            public void unlock(){
                sync.release(1);
            }
            
            public Condition newCondition(){
                return sync.newCondition();
            }
            
            public boolean isLocked(){
                return sync.isHeldExclusively();
            }
            
            public void lockInterruptibly() throw InterruptedException{
                sync.acquireInterruptibly(1);
            }
            
            public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException{
                sync.acquireInterrupptibly(1);
            }
            
            public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException{
                return sync.tryAcquireNanos(1,unit.toNanos(timeout));
            }
}

在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于要实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。

下面使用上面的自定义锁实现一个简单的生产-消费模型,代码如下:

final static NonReentrantLock lock = new NonReentrantLock();
final static Condition notFull = lock.newCondition();
final static Condition notEmpty = lock.newCondition();

final static Queue<String> queue = new LinkedBlcokingQueue<String>();
final static int queueSize = 10;

public static void main(String[] args){
    Thread producer = new Thread(new Runnable(){
        public void run(){
            // 获取独占锁
            lock.lock();
            try{
                // 1、如果队列满了,则等待
                while(queue.size() == queueSize){
                    notEmpty.await();
                }
                
                // 2、添加元素到队列
                queue.add("ele");
                
                // 3、唤醒消费线程
                notFull.signalAll();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 释放锁
                lock.unlock();
            }
        }
    });
    
    Thread consumer = new Thread(new Runnable(){
        public void run(){
            // 获取独占锁
            lock.lock();
            try{
                // 队列空,则等待
                while(0 == queue.size()){
                    notFull.await();
                }
                
                // 消费一个元素
                String ele = queue.poll();
                
                // 唤醒生产线程
                notEmpty.signalAll();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                // 释放锁
                lock.unlock();
            }
        }
    });
    
    // 启动线程
    producer.start();
    consumer.start();
}

如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。

在main函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用while而不是if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。

然后在main函数里创建了consumer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果对列为空则调用notFull.await()阻塞挂起当前线程。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。