Java锁(五)CyclicBarrier分析

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int
parties)
,其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

1CyclicBarrier使用实例

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}

 输出1,2或者2,1

 如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

 CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

public class CyclicBarrierTest2 {
    static CyclicBarrier c = new CyclicBarrier(2, new A());
    public static void main(String[] args) {
        new Thread(new Runnable() {
 
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
 
                }
                System.out.println(1);
            }
        }).start();
 
        try {
            c.await();
        } catch (Exception e) {
 
        }
        System.out.println(2);
    }
 
    static class A implements Runnable {
 
        @Override
        public void run() {
            System.out.println(3);
        }
 
    }
}

输出1、3、2

2、CyclicBarrier源码分析

 CyclicBarrier底层是基于ReentrantLockAbstractQueuedSynchronizerConditionObject来实现的,实现相对比较简单。了解前面的ReentrantLock,对AQS的分析中已经指出了其数据结构,在这里不再累赘。

CyclicBarrier的几个标志性的成员变量

/**
 * 循环栅栏的当前代
 */
private static class Generation {
    boolean broken = false;
}
 
/** 屏障的重入锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待状态直到触发*/
private final Condition trip = lock.newCondition();
/**  parties 数量 */
private final int parties;
/** 到达屏障时先触发的操作 */
private final Runnable barrierCommand;
 
/** 一个generation对象代表一代的屏障,
 * 就是说,如果generation对象不同,就代表进入了下一次的屏障,
 * 所以说,这个线程屏障是可循环的(Cyclic)
 */
private Generation generation = new Generation();
 
/**
 * count是计数器,如果有线程到达了屏障点,count就减1;
 * 直到count=0时,其它线程才可以向下执行
 */
private int count;

线程等待所有线程到达,触发栅栏

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

/**
 * 主要屏障代码,负责各种策略
 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取屏障的当前代信息
        final Generation g = generation;
 
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 线程中断,中断屏障
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
 
        // count-1,到达0的时候,所有的线程向下执行
        int index = --count;
        if (index == 0) {  // 触发屏障的栅栏
            boolean ranAction = false;
            try {
                // 如果设置了barrierCommand,优先执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 所有线程都到的屏障点
                // 更新屏障状态,唤醒其他线程,生成下一代屏障
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // 循环直到触发屏障栅栏,或者中断,超时
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 被中断,设置栅栏中断标志
                if (g == generation && ! g.broken) {
                    // 设置broken中断标示
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
 
            // 屏障被中断,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // 不是栅栏的当前代(所有线程都到达,已经生成下一代的generation)
            if (g != generation)
                return index;
 
            // 超时后,中断屏障
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
 
/**
 * 中断屏障,唤醒其他线程
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

/**
 * 更新屏障状态,唤醒其他线程,生成下一代屏障
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

3CyclicBarrierCountDownLatch的区别

CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

4CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。