private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; //栅栏放开钱执行的任务 private final Runnable barrierCommand; //当前代 private Generation generation = new Generation();//当前多少个线程在等待 private int count;
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
private void nextGeneration() { //唤醒所有阻塞的线程 trip.signalAll(); //产生下一代 count = parties; generation = new Generation(); } private static class Generation { //标识栅栏是否被破坏 boolean broken = false; }
private void breakBarrier() { //当前栅栏已经破坏 generation.broken = true; count = parties; //唤醒所有阻塞的线程 trip.signalAll(); }
//有2个await方法区别是是否超时 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每次调用count-- int index = --count; //index==0时说明需要等待的线程都到齐了 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; //在开启栅栏之前,同步执行传入的任务 if (command != null) command.run(); ranAction = true; //产生下一代唤醒等待线程并返回 nextGeneration(); return 0; } finally { //执行传入的任务报错则破坏栅栏 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //还没有全部到齐时 for (;;) { try { //未设置超时 if (!timed) trip.await(); //设置超时 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; //超时处理 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
class Person implements Runnable { String name; CyclicBarrier cyclicBarrier; public Person(String name, CyclicBarrier cyclicBarrier) { this.name = name; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println(name+"ready"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+"end ready"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name+"do something together"); } }
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all person ready"); } }); for (int i = 0; i < 5; i++) { new Thread(new Person("person"+i,cyclicBarrier)).start(); } }
person0readyperson1readyperson2readyperson3readyperson4readyperson4end readyperson3end readyperson2end readyperson1end readyperson0end ready执行all person readyperson1do something togetherperson4do something togetherperson2do something togetherperson3do something together
- CyclicBarrier可重复利用。(当一次用完count减到0后会产生下一代,count被重新设置为parties)
- 传入的barrierAction是同步执行的,如果执行时抛出异常,栅栏也会被破坏掉。