publicstaticvoidmain(String[] args){ CountDownLatch latch = new CountDownLatch(2); // 创建倒计时闩并指定倒计时次数为2 Worker w1 = new Worker("xxx", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION)); Worker w2 = new Worker("王大锤", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
new Thread(new WorkerTestThread(w1, latch)).start(); new Thread(new WorkerTestThread(w2, latch)).start();
try { latch.await(); // 等待倒计时闩减到0 System.out.println("All jobs have been finished!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CountDownLatch源码学习
内部数据结构
1 2 3 4 5 6 7 8 9 10 11
publicclassCountDownLatch{
privatefinal Sync sync;
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{} }
由类实现可知,该类持有一个Sync对象,提供所有的同步机制。
构造函数
1 2 3 4
publicCountDownLatch(int count){ if (count < 0) thrownew IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
privatevoidsetHeadAndPropagate(Node node, int propagate){ Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //如果当前节点是SIGNAL意味着,它正在等待一个信号, //或者说,它在等待被唤醒,因此做两件事,1:重置waitStatus标志位,2:重置成功后,唤醒下一个节点。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。 //意味着需要将状态向后一个节点传播。 elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
countDown源码
1 2 3
publicvoidcountDown(){ sync.releaseShared(1); }
调用了AQS的releaseShared方法,并传参1.
AQS中releaseShared源码
1 2 3 4 5 6 7
publicfinalbooleanreleaseShared(int arg){ if (tryReleaseShared(arg)) { doReleaseShared(); returntrue; } returnfalse; }
protectedbooleantryReleaseShared(int releases){ // Decrement count; signal when transition to zero for (;;) { //死循环更新state的值,实现state的减1操作,之所以用死循环是为了确保state值的更新成功。 int c = getState(); if (c == 0) returnfalse; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }