Java并发编程——Semaphore原理探究

PunkLu 2020年01月16日 177次浏览
信号量Semaphore原理探究

信号量Semaphore

Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

案例介绍

public class SemaphoreTest {

    // 创建一个Semaphore实例
    private static Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException{
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 将线程A添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try{
                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 将线程B添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 等待子线程执行完毕,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        // 关闭线程池
        executorService.shutdown();
    }
}

如上代码首先创建了一个信号量实例,构造函数的入参为0,说明当前信号量计数器的值为0。然后main函数向线程池添加两个线程任务,在每个线程内部调用信号量的release方法,这相当于让计数器值递增1.最后在main线程里面调用信号量的acquire方法,传参为2说明调用acquire方法的线程会一直阻塞,直到信号量的计数变为2才会返回。

模拟CyclicBarrier复用的功能:

public class SemaphoreTest2 {

    // 创建一个Semaphore实例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException{
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 将线程A添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " A task over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 将线程B添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " A task over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 1、等待子线程执行任务A完毕,返回
        semaphore.acquire(2);

        // 将线程C添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try{
                    System.out.println(Thread.currentThread() + " B task over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 将线程d添加到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " B task over");
                    semaphore.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

        // 2、等待子线程执行B完毕,返回
        semaphore.acquire(2);

        System.out.println("task is over");

        // 关闭线程池
        executorService.shutdown();
    }
}

如上代码首先将线程A和线程B加入到线程池,主线程执行代码1后被阻塞。线程A和线程B调用release方法后信号量的值变为了2,这时候主线程的acquire方法会在获取到2个信号量后返回(返回后当前信号量值为0)。然后主线程添加线程C和线程D到线程池,之后主线程执行代码2后被阻塞(因为主线程要获取2个信号量,而当前信号量个数为0)。

实现原理探究

Semaphore也是使用AQS实现的。Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。例如,下面的代码在创建Semaphore时会使用一个变量指定是否使用公平策略。

public Semaphore(int permits){
	sync = new NonfairSync(permits);
}

public Semaphore(int permits,boolean fair){
	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
	NonfairSync(permits);
}

Sync(int permits){
	setState(permits);
}

在如上代码中,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。另外,如CountDownLatch构造函数传递的初始化信号量个数permits被赋给了AQS的state状态变量一样,这里AQS的state值也表示当前持有的信号量个数。

1、void acquire()方法

当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。

public void acquire() throws InterruptedException{
	// 传递参数为1,说明要获取1个信号量资源
	sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException
	// 1、如果线程被中断,则抛出中断异常
	if(Thread.interrupted())
		throw new InterruptedException();
	// 2、否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略
	if(tryAcquireShared(arg) < 0)
		// 如果获取失败则放入阻塞队列。然后再次尝试,如果失败则调用park方法挂起当前线程
		doAcquireSharedInterruptibly(arg);
}

由以上代码可知,acquire()在内部调用了Sync的acquireSharedInterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法tryAcquireShared是由Semaphore中的Sync的子类实现的,先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下:

protected int tryAcquireShared(int acquires){
	return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires){
	for(;;){
		// 获取当前信号量值
		int available = getState();
		// 计算当前剩余值
		int remaining = available - acquires;
		// 如果当前剩余值小于0或者CAS设置成功则返回
		if(remaining < 0 || compareAndSetState(available,remaining))
			return remaining;
	}
}

如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。

由于NonfairSync是非公平获取的,也就是说先调用acquire方法获取信号量的线程不一定比后来者先获取到信号量。如果线程A先调用了acquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列。过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量,但是假如线程C释放信号量后,线程C调用了acquire方法,那么线程C就会和线程A去竞争这个信号量资源。如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程A获取到该信号量。下面看公平性的FairSync类是如何保证公平性的:

protected int tryAcquireShared(int acquires){
	for(;;){
		if(hasQueuedPredecessors())
			return -1;
		int available = getState();
		int remaining = available - acquires;
		if(remaining < 0 || compareAndSetState(available,remaining))
			return remainging;
	}
}

可见公平性还是靠hasQueuedPredecessors这个函数保证的。公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。

2、void acquire(int permits)方法

与acquire()方法的不同在于,此方法需要获取多个信号量值

public void acquire(int permits) throws InterruptedException{
	if(permits < 0) throw new IllegalArgumentException();
	sync.acquireSharedInterruptibly(permits);
}

3、void acquireUninterruptibly()方法

与acquire()方法类似,不同之处在于对中断不响应

public void acquireUninterruptibly(){
	sync.acquireShared(1);
}

4、void acquireUninterruptibly(int permits)方法

该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应

public void acquireUniterruptibly(int permits){
	if(permits < 0 ) throw new IllegalArgumentException();
	sync.acquireShared(permits);
}

5、void release()方法

该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用acquire方法被阻塞而被放入了AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程激活,激活的线程会尝试获取刚增加的信号量

public void release(){
	// 1、arg=1
	sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
	// 2、尝试释放资源
    if (tryReleaseShared(arg)) {
    	// 3、资源释放成功则调用unpark方法唤醒AQS队列里面最先挂起的线程
       doReleaseShared();
       return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    	// 4、获取当前信号量值
        int current = getState();
        
        // 5、将当前信号量值增加releases,这里为增加1
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        
        // 6、使用CAS保证更新信号量值的原子性
        if (compareAndSetState(current, next))
             return true;
        }
}

6、void release(int permits)方法

该方法与不带参数的release()方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加permits,而后者每次增加1

public void release(int permits){
	if(permits < 0) throw new IllegalArgumentException();
	sync.releaseShared(permits);
}