前言

继续学习基于同步器的另一个并发编程的场景 CyclicBarrier 的实现。字面意思是循环(cyclic)的屏障(barrier)。整体就是多线程的同步屏障

CyclicBarrier使一定数量的线程到达屏障时进行等待,等到最后一个线程达到时,再一起继续执行,支持再次使用。


系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier


CyclicBarrier大致如下:
CyclicBarrier


CyclicBarrier

先看下CyclicBarrier的类结构

CyclicBarrier的成员变量

// 屏障,使用条件队列Condition
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

// 阻塞的线程数,final不可改变,记录下后以便轮训使用
private final int parties;

// 所有线程到达后,执行的任务,可有可无
private final Runnable barrierCommand;

// 当前屏障标识
private Generation generation = new Generation();
// The generation changes whenever the barrier is tripped, or is reset
private static class Generation {
	boolean broken = false;
}

// 还需要等待的线程数
private int count;

Generation说明一下

Generation用来控制屏障的循环使用的,如果generation的broken为true的话,表示屏障损坏,后续的线程调用await方法时,都会抛出BrokenBarrierException异常。

导致屏障损坏的原因有可能:
1、如果有线程中断,抛出线程中断之前,会置breakBarrierd的generation.broken=true;
2、手动调用reset方法,中断当前屏障,开启下一次的同步屏障
3、某个线程等待超时


继续了解 CyclicBarrier的方法

// 构造器,屏蔽的数量
public CyclicBarrier(int parties) {
        this(parties, null);
}

// 构造器,屏蔽拦截的线程数量及屏蔽结束之后的执行的任务
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
}

// 下一代,为下次循环屏蔽做准备
private void nextGeneration() {
        // 唤醒当前屏蔽的线程
        trip.signalAll();
        // 初始化 set up next generation
        count = parties;
        generation = new Generation();
}

// 破坏屏障
private void breakBarrier() {
        generation.broken = true; // 更新屏障标识
        count = parties;
        trip.signalAll();	// 唤醒等待在当前屏障的线程
}

// 关键方法:
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
	// 获取锁
        final ReentrantLock lock = this.lock;
	// 获取排它锁资源,才能继续
        lock.lock();
        try {
	    // 当前代
            final Generation g = generation;
	    // 如果当前代的屏障之前别破坏了,则抛异常
            if (g.broken)
                throw new BrokenBarrierException();
	    // 如果线程被中断过,则破坏屏障(唤醒等待线程、重置count),且后续线程均抛BrokenBarrierException
	    // 当前线程再抛InterruptedException异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped, 拦截到最后一个线程
                boolean ranAction = false;
                try {
			// 如果有Runnable任务,则执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
			// 开启下一代/魂环的屏障
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) { // 循环屏障拦截,所有线程
                try {
                    if (!timed)
			// 关键方法,进入条件队列,trip为当前锁的condition
                        trip.await();
                    else if (nanos > 0L)
			// 进入条件队列,支持超时(一旦线程是超时醒的,则破坏屏障,抛出异常)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
		    // 如果等待中被中断,且仍未当前代,则进行屏障失效
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 如果已经不是原来那代了,则不要影响当前代哈
                        Thread.currentThread().interrupt();
                    }
                }
		// 屏障破坏了,等待被唤醒的线程同样抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();

		// 关键关键:正常会执行此处,以为index=0时会调用nextGeneration();进行唤醒屏障等待的线程,
		// 此时会返回当前等待时的“下标”。
                if (g != generation)
                    return index;
		
		// 等待超时,也是会触发屏障失效或破坏
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
		// 释放排它锁
            lock.unlock();
        }
}

// 关键方法,重载
public int await() throws InterruptedException, BrokenBarrierException {
        try { // 非超时
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
}
// 关键方法,重载,支持超时
public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
}

// 屏障是否损坏
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
}
// 重置屏障,唤醒当前等待的线程,开启下一次
public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
}

// 获取到达屏障等待的线程数量
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
}

至此,该类分析完毕,核心方法: private int dowait(boolean timed, long nanos);


案例

// 为了方便debug,3个线程分开定义,修改睡眠时间进行不同状态debug
private static void test() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        // 为了方便进入debug,所以分开 new Thread
        new Thread(() -> {
            System.out.println(DateUtils.getCurrentDateTime() + " 线程1进入等待");
            try {
                cyclicBarrier.await();
                System.out.println(DateUtils.getCurrentDateTime() + " 线程1开始执行任务");
                Thread.sleep(1 * 1000);
                System.out.println(DateUtils.getCurrentDateTime() + " 线程1执行完成");
            } catch (Exception e) { e.printStackTrace(); }
        }, "1").start();

        new Thread(() -> {
            System.out.println(DateUtils.getCurrentDateTime() + " 线程2进入等待");
            try {
                Thread.sleep(3 * 1000);
                cyclicBarrier.await();
                System.out.println(DateUtils.getCurrentDateTime() + " 线程2开始执行任务");
                Thread.sleep(2 * 1000);
                System.out.println(DateUtils.getCurrentDateTime() + " 线程2执行完成");
            } catch (Exception e) { e.printStackTrace(); }
        }, "2").start();

        new Thread(() -> {
            System.out.println(DateUtils.getCurrentDateTime() + " 线程3进入等待");
            try {
                Thread.sleep(5 * 1000);
                cyclicBarrier.await();
                System.out.println(DateUtils.getCurrentDateTime() + " 线程3开始执行任务");
                Thread.sleep(3 * 1000);
                System.out.println(DateUtils.getCurrentDateTime() + " 线程3执行完成");
            } catch (Exception e) { e.printStackTrace(); }
        }, "3").start();

        Thread.sleep(1000 * 1000);
}

输出结果:

2021-11-21 14:26:01 线程1进入等待
2021-11-21 14:26:01 线程2进入等待
2021-11-21 14:26:01 线程3进入等待
2021-11-21 14:26:06 线程3开始执行任务
2021-11-21 14:26:06 线程2开始执行任务
2021-11-21 14:26:06 线程1开始执行任务
2021-11-21 14:26:07 线程1执行完成
2021-11-21 14:26:08 线程2执行完成
2021-11-21 14:26:09 线程3执行完成

案例分析

大致流程图,简化为两个线程的同步屏障
流程


总结几个点

1、只要屏障被某一个线程破坏,其它所有的线程都会抛出异常,同时等待中的被唤醒后也抛出异常。
2、是结合ReentrantLock排它锁加上Condition条件队列进行实现处理的。
3、因为使用排它锁进行await等待,性能不高。