前言

本节我们来学习Semaphore,字面意思为信号灯、信号量。在多线程场景下,主要控制可以同时访问某种资源的线程个数,下面一起从案例及源码的角度学习下。


系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier


一个例子:
大学食堂吃饭,学生打饭在食堂吃,食堂是 提供桌椅的,位置的多少是固定的,人多时就只能等待其他同学吃好后离开座位,等待的同学便可以有座位吃饭了(不考虑打包带走和站着吃的)。
栗子中:
食堂的座位比喻成一种资源或许可证
学生比喻竞争该资源的线程
在同一时刻,只能固定的学生有座位吃饭,多余同学需要进行等待。


Semaphore

先了解下Semaphore类结构

首先看下Semaphore的内部类
和ReentrantLock一样的套路,内部定义了名为Sync的抽象静态内部类

// 抽象的类,看到下面便会知道存在公平与非公平的实现的
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
	// 构造器,传入许可证
        Sync(int permits) {
            setState(permits);
        }
        final int getPermits() {
            return getState();
        }
	// 关键方法:尝试非公平获取资源
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
		// 取目前的资源数
                int available = getState();
                int remaining = available - acquires;
		// 减去申请的资源,如果剩余的大于等于0,尝试CAS更新 
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

	// 关键方法:尝试释放共享资源,实现了AQS类的对应方法
        protected final boolean tryReleaseShared(int releases) {
            for (;;) { // 循环释放
                int current = getState();
                int next = current + releases;
		// 如果releases为负数,则报错,否则CAS更新资源
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

	// 方法与tryReleaseShared类似,减少当前资源的数量,会一直循环
	// 参数不能是负数
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
	// 清除资源,将当前剩余的资源清除为0,会一直循环
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
}

/**
* NonFair version
*/
// 非公平锁
static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }
	// 关键方法,尝试获取资源(许可证),实现了AQS的方法
        protected int tryAcquireShared(int acquires) {
	    // 调用Sync的方法
            return nonfairTryAcquireShared(acquires);
        }
}

/**
* Fair version
*/
// 公平锁
static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }
	// 关键方法,尝试公平获取资源(许可证),实现了AQS的方法
	// 与非公平锁的区别,会先判断,队列是不是有节点,或者头节点之后的节点是不是当前线程节点(有点重入的意思)
        protected int tryAcquireShared(int acquires) {
            for (;;) {
		// 队列是否有节点,判断
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))		
		    // 返回减去申请资源后剩余的资源数
                    return remaining;
            }
        }
}

看下hasQueuedPredecessors()方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
	// 头节点不等与尾节点,并且头节点的下节点不为null的情况下,头节点的下个节点非当前线程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}

Semaphore的变量及方法如下

// Sync作为成员变量,核心的逻辑方法都在该抽象类或其实现类下面
private final Sync sync;

// 构造器,传入可被使用的资源
public Semaphore(int permits) {
	// 默认是非公平的
	sync = new NonfairSync(permits);
}
// 构造器,传入可被使用的资源及是否公平竞争
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

// 请求资源,默认1个,同时支持中断处理(和CountDownLatch一样样的)
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}
// 请求指定的资源,同时支持中断处理
public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
}

// 请求资源,默认1个(非中断)
public void acquireUninterruptibly() {
        sync.acquireShared(1);
}
// 请求指定的资源(非中断)
public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
}

// 尝试请求资源,仅仅是尝试,不会进队列等待,非阻塞
public boolean tryAcquire() {
	return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试请求指定的资源,仅仅是尝试
public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
}


// 尝试请求资源,会进行指定睡眠时长的那种
public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 尝试请求指定的资源,仅仅是尝试,会进行超时睡眠的那种
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

// 释放资源,默认1个,共享方式,会传播唤醒
public void release() {
        sync.releaseShared(1);
}
// 释放指定的资源,共享方式,会传播唤醒
public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}

// 获取当前可用的资源
public int availablePermits() {
        return sync.getPermits();
}
// 清除“目前”的资源
public int drainPermits() {
        return sync.drainPermits();
}
// 减少资源,循环直至成功
protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
}

// 获取队列长度
public final int getQueueLength() {
        return sync.getQueueLength();
}
// 对应AQS的方法
public final int getQueueLength() {
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) {
	     // 从尾节点遍历有线程信息加1,不包括头节点的(头节点没有线程信息)
            if (p.thread != null)
                ++n;
        }
        return n;
}

至此,Semaphore的类结构分析完成,下面看例子


案例

public static void test() throws InterruptedException {
        // 定义1个permits许可证
        Semaphore semaphore = new Semaphore(1);
        new Thread(() -> {
            try {
                System.out.println(DateUtils.getCurrentDateTime() + " 进入线程" + 1 + "");
                // 请求资源 见下 代码块1
                semaphore.acquire();
                System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 1 + "获得锁 开始执行任务了");
                Thread.sleep(10 * 1000);
                System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 1 + " 执行结束 释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
		// 请求资源 见下 代码块3
                semaphore.release();
            }
        }, "1").start();

        new Thread(() -> {
            try {
                System.out.println(DateUtils.getCurrentDateTime() + " 进入线程" + 2 + "");
                // 请求资源 见下 代码块2
                semaphore.acquire();
                System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 2 + "获得锁 开始执行任务了");
                Thread.sleep(10 * 1000);
                System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 2 + " 执行结束 释放锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }, "2").start();

	// 主线程睡眠
        Thread.sleep(1000 * 1000);
}

不同的地方debug可以看获取资源不等待、入队等待、释放资源过程。

输出结果:

2021-11-20 22:00:40 进入线程1
2021-11-20 22:00:40 进入线程2
2021-11-20 22:00:40 线程1获得锁 开始执行任务了
2021-11-20 22:00:50 线程1 执行结束 释放锁
2021-11-20 22:00:50 线程2获得锁 开始执行任务了
2021-11-20 22:01:00 线程2 执行结束 释放锁


案例源码分析

同样分析前,先整个流程看看,为了简单的体现,以两个线程来说明,线程1和线2
过程分析


具体步骤:
如下 代码块1

进入代码块1的时候,此时是第一个线程进行抢占资源
java.util.concurrent.Semaphore
public void acquire() throws InterruptedException {
	// 继续往下看
        sync.acquireSharedInterruptibly(1);
}

// 进入QAS的实现方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
	// tryAcquireShared会调用AQS的实现类方法,即Semaphore实现的方法
        if (tryAcquireShared(arg) < 0) // 为负数才继续执行
            doAcquireSharedInterruptibly(arg);
}

继续跟进,进入 java.util.concurrent.Semaphore#Sync(非公平锁的实现)
final int nonfairTryAcquireShared(int acquires) {
            for (;;) { // 循环
                int available = getState();
                int remaining = available - acquires;
		// 第一个线程调用刚好,此时remaining=0,所以执行cas更新state,并返回0
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
}

回头看上一步acquireSharedInterruptibly方法,if判断false,会直接返回,节点获取资源/许可证
if (tryAcquireShared(arg) < 0) // 为负数才继续执行
	doAcquireSharedInterruptibly(arg);

一个如果

如果Semaphore是公平的,在循环内开始会判断如果队列有节点则返回-1,则会进入等待队列,详见 代码块2
if (hasQueuedPredecessors())
return -1;


继续看线程2调用 代码块2

// 方法和上面代码块分析的一样,那么直接看不同的地方
public void acquire() throws InterruptedException {
	// 继续往下看
        sync.acquireSharedInterruptibly(1);
}
...
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
	// 此处tryAcquireShared同样进入Semaphore的实现方法
	// java.util.concurrent.Semaphore#Sync,但此时会返回-1
        if (tryAcquireShared(arg) < 0) 
            doAcquireSharedInterruptibly(arg);
}

继续看,关键方法doAcquireSharedInterruptibly();
// 看过本系列文章CountDownLatch的分析的话,是同一个方法,此处不单独分析了,大致过程如下:
// 1、创建共享节点,增加到头节点后面(没有头节点会创建一个出来)
// 2、获取前节点,如果是头节点的话,再一次尝试获取资源
// 3、获取资源失败后,更新头节点的waitStatus为-1,表示后节点后面需要唤醒
// 4、当前节点进入线程等待,等待“前节点唤醒”
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

至此,线程2获取资源失败,进入等待队列


继续看 代码块3,此时线程1任务执行完成,释放资源了

java.util.concurrent.Semaphore
public void release() {
	// 释放不分是不是公平的
	sync.releaseShared(1);
}

继续跟踪
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
	// tryReleaseShared会调用AQS子类的实现方法
        if (tryReleaseShared(arg)) {
	    // 释放共享资源
            doReleaseShared();
            return true;
        }
        return false;
}

继续看实现tryReleaseShared
java.util.concurrent.Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
            for (;;) { // 循环
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
		// 如果当前资源数加预释放数小于当前的资源数,抛异常,否则执行CAS更新,并返回true
                if (compareAndSetState(current, next))
                    return true;
            }
}

// 往后看调用地方,会进行释放共享资源
// 方法在本系列上一篇CountDownLatch分析过,大致过程:
// 1、如果头节点不为null,并且头节点不等于尾节点
// 2、cas节点状态为0,进行唤醒后续节点
// 3、关键:直到头节点没有改变(被唤醒的节点会更新为head节点,head会改变),否则一直for循环传播唤醒
java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doReleaseShared() {
        for (;;) { 
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
}

至此,线程归还了资源,同时唤醒了等待的线程2

被唤醒的线程2继续执行,尝试获取资源/许可证,当然会获取成功了


private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
		// b、此时为true,进入 
                if (p == head) {
		    // c、调用Semaphore返回0,进入if(r >= 0)里面
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
			// d、关键方法,设置node节点为头节点,同时传播唤醒后续节点,如果有的话(当前是最后一个节点 head=tail则不会再传播了)
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // a、线程2在此处睡眠的,唤醒后继续执行
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

至此,线程1和线程2完成了等待入队及唤醒的整个过程,同时只能一个线程进行资源的访问。

总结几个点

1、Semaphore是创建的共享节点加入同步队列的,因此唤醒时是传播唤醒的。
2、Semaphore支持中断和非中断两种获取资源的方式。