并发编程系列-同步器实现四 Semaphore
前言
本节我们来学习Semaphore,字面意思为信号灯、信号量。在多线程场景下,主要控制可以同时访问某种资源的线程个数,下面一起从案例及源码的角度学习下。
系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier
一个例子:
大学食堂吃饭,学生打饭在食堂吃,食堂是 提供桌椅的,位置的多少是固定的,人多时就只能等待其他同学吃好后离开座位,等待的同学便可以有座位吃饭了(不考虑打包带走和站着吃的)。
栗子中:
食堂的座位比喻成一种资源或许可证
学生比喻竞争该资源的线程
在同一时刻,只能固定的学生有座位吃饭,多余同学需要进行等待。
Semaphore
先了解下Semaphore类结构
首先看下Semaphore的内部类
和ReentrantLock一样的套路,内部定义了名为Sync的抽象静态内部类
// 抽象的类,看到下面便会知道存在公平与非公平的实现的
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造器,传入许可证
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 关键方法:尝试非公平获取资源
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 取目前的资源数
int available = getState();
int remaining = available - acquires;
// 减去申请的资源,如果剩余的大于等于0,尝试CAS更新
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 关键方法:尝试释放共享资源,实现了AQS类的对应方法
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 循环释放
int current = getState();
int next = current + releases;
// 如果releases为负数,则报错,否则CAS更新资源
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 方法与tryReleaseShared类似,减少当前资源的数量,会一直循环
// 参数不能是负数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 清除资源,将当前剩余的资源清除为0,会一直循环
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* NonFair version
*/
// 非公平锁
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
// 关键方法,尝试获取资源(许可证),实现了AQS的方法
protected int tryAcquireShared(int acquires) {
// 调用Sync的方法
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
// 公平锁
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
// 关键方法,尝试公平获取资源(许可证),实现了AQS的方法
// 与非公平锁的区别,会先判断,队列是不是有节点,或者头节点之后的节点是不是当前线程节点(有点重入的意思)
protected int tryAcquireShared(int acquires) {
for (;;) {
// 队列是否有节点,判断
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
// 返回减去申请资源后剩余的资源数
return remaining;
}
}
}
看下hasQueuedPredecessors()方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头节点不等与尾节点,并且头节点的下节点不为null的情况下,头节点的下个节点非当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
Semaphore的变量及方法如下
// Sync作为成员变量,核心的逻辑方法都在该抽象类或其实现类下面
private final Sync sync;
// 构造器,传入可被使用的资源
public Semaphore(int permits) {
// 默认是非公平的
sync = new NonfairSync(permits);
}
// 构造器,传入可被使用的资源及是否公平竞争
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 请求资源,默认1个,同时支持中断处理(和CountDownLatch一样样的)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 请求指定的资源,同时支持中断处理
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 请求资源,默认1个(非中断)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 请求指定的资源(非中断)
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 尝试请求资源,仅仅是尝试,不会进队列等待,非阻塞
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试请求指定的资源,仅仅是尝试
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 尝试请求资源,会进行指定睡眠时长的那种
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 尝试请求指定的资源,仅仅是尝试,会进行超时睡眠的那种
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 释放资源,默认1个,共享方式,会传播唤醒
public void release() {
sync.releaseShared(1);
}
// 释放指定的资源,共享方式,会传播唤醒
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// 获取当前可用的资源
public int availablePermits() {
return sync.getPermits();
}
// 清除“目前”的资源
public int drainPermits() {
return sync.drainPermits();
}
// 减少资源,循环直至成功
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// 获取队列长度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 对应AQS的方法
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
// 从尾节点遍历有线程信息加1,不包括头节点的(头节点没有线程信息)
if (p.thread != null)
++n;
}
return n;
}
至此,Semaphore的类结构分析完成,下面看例子
案例
public static void test() throws InterruptedException {
// 定义1个permits许可证
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
System.out.println(DateUtils.getCurrentDateTime() + " 进入线程" + 1 + "");
// 请求资源 见下 代码块1
semaphore.acquire();
System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 1 + "获得锁 开始执行任务了");
Thread.sleep(10 * 1000);
System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 1 + " 执行结束 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 请求资源 见下 代码块3
semaphore.release();
}
}, "1").start();
new Thread(() -> {
try {
System.out.println(DateUtils.getCurrentDateTime() + " 进入线程" + 2 + "");
// 请求资源 见下 代码块2
semaphore.acquire();
System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 2 + "获得锁 开始执行任务了");
Thread.sleep(10 * 1000);
System.out.println(DateUtils.getCurrentDateTime() + " 线程" + 2 + " 执行结束 释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "2").start();
// 主线程睡眠
Thread.sleep(1000 * 1000);
}
不同的地方debug可以看获取资源不等待、入队等待、释放资源过程。
输出结果:
2021-11-20 22:00:40 进入线程1
2021-11-20 22:00:40 进入线程2
2021-11-20 22:00:40 线程1获得锁 开始执行任务了
2021-11-20 22:00:50 线程1 执行结束 释放锁
2021-11-20 22:00:50 线程2获得锁 开始执行任务了
2021-11-20 22:01:00 线程2 执行结束 释放锁
案例源码分析
同样分析前,先整个流程看看,为了简单的体现,以两个线程来说明,线程1和线2
具体步骤:
如下 代码块1
进入代码块1的时候,此时是第一个线程进行抢占资源
java.util.concurrent.Semaphore
public void acquire() throws InterruptedException {
// 继续往下看
sync.acquireSharedInterruptibly(1);
}
// 进入QAS的实现方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared会调用AQS的实现类方法,即Semaphore实现的方法
if (tryAcquireShared(arg) < 0) // 为负数才继续执行
doAcquireSharedInterruptibly(arg);
}
继续跟进,进入 java.util.concurrent.Semaphore#Sync(非公平锁的实现)
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 循环
int available = getState();
int remaining = available - acquires;
// 第一个线程调用刚好,此时remaining=0,所以执行cas更新state,并返回0
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
回头看上一步acquireSharedInterruptibly方法,if判断false,会直接返回,节点获取资源/许可证
if (tryAcquireShared(arg) < 0) // 为负数才继续执行
doAcquireSharedInterruptibly(arg);
一个如果
如果Semaphore是公平的,在循环内开始会判断如果队列有节点则返回-1,则会进入等待队列,详见 代码块2
if (hasQueuedPredecessors())
return -1;
继续看线程2调用 代码块2
// 方法和上面代码块分析的一样,那么直接看不同的地方
public void acquire() throws InterruptedException {
// 继续往下看
sync.acquireSharedInterruptibly(1);
}
...
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 此处tryAcquireShared同样进入Semaphore的实现方法
// java.util.concurrent.Semaphore#Sync,但此时会返回-1
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
继续看,关键方法doAcquireSharedInterruptibly();
// 看过本系列文章CountDownLatch的分析的话,是同一个方法,此处不单独分析了,大致过程如下:
// 1、创建共享节点,增加到头节点后面(没有头节点会创建一个出来)
// 2、获取前节点,如果是头节点的话,再一次尝试获取资源
// 3、获取资源失败后,更新头节点的waitStatus为-1,表示后节点后面需要唤醒
// 4、当前节点进入线程等待,等待“前节点唤醒”
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
至此,线程2获取资源失败,进入等待队列
继续看 代码块3,此时线程1任务执行完成,释放资源了
java.util.concurrent.Semaphore
public void release() {
// 释放不分是不是公平的
sync.releaseShared(1);
}
继续跟踪
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// tryReleaseShared会调用AQS子类的实现方法
if (tryReleaseShared(arg)) {
// 释放共享资源
doReleaseShared();
return true;
}
return false;
}
继续看实现tryReleaseShared
java.util.concurrent.Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 循环
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 如果当前资源数加预释放数小于当前的资源数,抛异常,否则执行CAS更新,并返回true
if (compareAndSetState(current, next))
return true;
}
}
// 往后看调用地方,会进行释放共享资源
// 方法在本系列上一篇CountDownLatch分析过,大致过程:
// 1、如果头节点不为null,并且头节点不等于尾节点
// 2、cas节点状态为0,进行唤醒后续节点
// 3、关键:直到头节点没有改变(被唤醒的节点会更新为head节点,head会改变),否则一直for循环传播唤醒
java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
至此,线程归还了资源,同时唤醒了等待的线程2
被唤醒的线程2继续执行,尝试获取资源/许可证,当然会获取成功了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// b、此时为true,进入
if (p == head) {
// c、调用Semaphore返回0,进入if(r >= 0)里面
int r = tryAcquireShared(arg);
if (r >= 0) {
// d、关键方法,设置node节点为头节点,同时传播唤醒后续节点,如果有的话(当前是最后一个节点 head=tail则不会再传播了)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // a、线程2在此处睡眠的,唤醒后继续执行
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
至此,线程1和线程2完成了等待入队及唤醒的整个过程,同时只能一个线程进行资源的访问。
总结几个点
1、Semaphore是创建的共享节点加入同步队列的,因此唤醒时是传播唤醒的。
2、Semaphore支持中断和非中断两种获取资源的方式。