Java并发 - CyclicBarrier详解 CyclicBarrier 是 Java 中的一个同步工具类,用于实现多个线程之间的同步点。它允许一组线程等待彼此到达某个共同点,然后继续执行后续任务。CyclicBarrier 的作用是在多个线程并行计算中,它们各自计算完成后等待其他线程,当所有线程都到达同一个同步点时,它们才能继续执行后续的任务。
1. 主要特点和用途 同步点: CyclicBarrier 提供了一个同步点,多个线程可以在这个点等待,直到所有线程都到达了这个点,然后它们才能继续执行。
循环使用: CyclicBarrier 可以被循环使用,一旦所有等待线程都到达同步点,它就会重置,可以被再次使用。
构造方法: CyclicBarrier 的构造方法接受一个整数参数,表示需要等待的线程数量。当等待的线程数量达到指定数量时,所有线程才能继续执行。
await 方法: 线程通过调用 await 方法来等待其他线程,当调用 await 方法的线程数量达到构造方法中指定的数量时,所有线程将会被释放。
2. CyclicBarrier源码解析 2.1 await方法和类变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock (); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation (); private int count; public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public CyclicBarrier (int parties) { this (parties, null ); } public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } } private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } } }
2.2 nextGeneration 在上述dowait中,所有线程成功进入屏障时会被调用。即生成下一个版本,所有线程又可以重新进入线程中。
1 2 3 4 5 6 7 8 9 10 11 private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation (); }
在本函数中会调用AQS中的singalAll方法,唤醒当前所有的等待线程。其源码如下
1 2 3 4 5 6 7 8 9 public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
以下函数尝试把节点加入到等待队列中,并返回前驱节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
2.3 breakBarrier函数 本函数的主要作用就是破坏屏障
1 2 3 4 5 6 7 8 private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); }
3. CyclicBarrier示例
以下示例为七龙珠召唤神龙,共21个线程,每七个线程调用一次await方法召唤一次神龙。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier (7 ,()->{ System.out.println("======召唤神龙======" ); }); for (int i = 0 ; i < 21 ; i++) { final int temp = i+1 ; new Thread (()->{ try { System.out.println(Thread.currentThread().getName() + "收到" ); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "结束" ); }catch (Exception e){ e.printStackTrace(); } },String.valueOf(temp)).start(); } } }
运行结果(共召唤三次神龙):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 Connected to the target VM, address: '127.0.0.1:50305' , transport: 'socket' 1 收到4 收到9 收到5 收到8 收到7 收到6 收到18 收到2 收到11 收到 ======召唤神龙10 收到12 收到6 结束9 结束16 收到3 收到14 收到7 结束 ======召唤神龙19 收到13 收到14 结束20 收到11 结束18 结束2 结束10 结束12 结束16 结束21 收到5 结束8 结束1 结束17 收到4 结束15 收到 ======召唤神龙15 结束3 结束21 结束17 结束20 结束13 结束19 结束 Disconnected from the target VM, address: '127.0.0.1:50305' , transport: 'socket'