SynchronousQueue没有容量(A synchronous queue does not have any internal capacity, not even a capacity of one.)。所以其内部方法:isEmpty,size,clear,contains,remove都是默认实现。
// 最大空旋时间 staticfinalint maxTimedSpins = (NCPUS < 2) ? 0 : 32; // 无限时的等待的最大空旋时间 staticfinalint maxUntimedSpins = maxTimedSpins * 16; // 超时空旋等待阈值 staticfinallong spinForTimeoutThreshold = 1000L; privatetransientvolatile Transferer<E> transferer; //非公平策略实现类 staticfinalclassTransferStack<E> extendsTransferer<E> { // 表示消费数据的消费者 staticfinalint REQUEST = 0; // 表示生产数据的生产者 staticfinalint DATA = 1; //表示该操作节点处于真正匹配状态 staticfinalint FULFILLING = 2; //内部节点类 staticfinalclassSNode{} } //公平策略实现类 staticfinalclassTransferQueue<E> extendsTransferer<E> { //队列头 transientvolatile QNode head; //队列尾 transientvolatile QNode tail; //当要删除的节点为队列中最后一个元素时,会引用到这个节点 transientvolatile QNode cleanMe; //内部节点类 staticfinalclassQNode{} } //内部抽象类,其公平/非公平策略都是基于该类 abstractstaticclassTransferer<E> { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ // 转移数据,put或者take操作 abstract E transfer(E e, boolean timed, long nanos); } }
publicbooleanoffer(E e){ if (e == null) thrownew NullPointerException(); return transferer.transfer(e, true, 0) != null; }
//超时offer源码 publicbooleanoffer(E e, long timeout, TimeUnit unit) throws InterruptedException { //元素e为空,抛异常 if (e == null) thrownew NullPointerException(); //进行转移操作 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) returntrue; if (!Thread.interrupted()) // 当前线程没有被中断 returnfalse; thrownew InterruptedException(); }
offer的源码实现与put的源码实现很类似。不在分析。
take源码
1 2 3 4 5 6 7 8
// 获取并移除此队列的头,如有必要则等待另一个线程插入它 public E take()throws InterruptedException { E e = transferer.transfer(null, false, 0); // 进行转移操作 if (e != null) return e; Thread.interrupted(); thrownew InterruptedException(); }
在使用 take 从阻塞队列中获取元素时,也是通过Transfer对象去处理。在获取的元素不为null时,直接返回;否则,中断并抛出异常。
poll源码
1 2 3 4 5 6 7 8 9 10 11 12 13
// 如果另一个线程当前正要使用某个元素,则获取并移除此队列的头 public E poll(){ return transferer.transfer(null, true, 0); }
//超时poll源码 // 获取并移除此队列的头,如有必要则等待指定的时间,以便另一个线程插入它 public E poll(long timeout, TimeUnit unit)throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) // 元素不为null或者当前线程没有被中断 return e; thrownew InterruptedException(); }
//单元节点类 staticfinalclassQNode{ volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark finalboolean isData; } }
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos){ /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */
QNode s = null; // constructed/reused as needed // 确定此次转移的类型(put or take) boolean isData = (e != null);
for (;;) { QNode t = tail; // 获取尾结点 QNode h = head; //获取头结点
//初始化的队头和队尾都是指向的一个"空" 节点,不会是null if (t == null || h == null) // 看到未初始化的头尾结点 // saw uninitialized value continue; // spin
if (h == t || t.isData == isData) { //入队操作。 队列为空 或者 尾结点的模式与当前结点模式相同,即上次和本次是同样的操作:入队或者出队。每一个入队和出队是互相匹配的 // empty or same-mode // 获取尾结点的next域 QNode tn = t.next;
//用于移除已经被取消的结点。pred 是s的前驱 voidclean(QNode pred, QNode s){ s.waiter = null; // forget thread /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. */ while (pred.next == s) { // pred的next域为s // Return early if already unlinked QNode h = head; // 获取头结点 // 获取头结点的next域 QNode hn = h.next; // Absorb cancelled first node as head
//有需要删除的节点 if (dp != null) { // dp不为null,断开前面被取消的结点 // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; /** * cleamMe 失效的情况有: * (1)cleanMe的后继而空(cleanMe 标记的是需要删除节点的前驱) * (2)cleanMe的后继等于自身(这个前面有分析过) * (3)需要删除节点的操作没有被取消 * (4)被删除的节点不是尾节点且其后继节点有效,并将待删除节点删除 */ if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced
//清除cleanMe节点 casCleanMe(dp, null);
//dp==pred 表示已经被设置过了 if (dp == pred) return; // s is already saved node } elseif (casCleanMe(null, pred)) // 原来的 cleanMe 是 null, 则将 pred 标记为 cleamMe 为下次 清除 s 节点做标识 return; // Postpone cleaning s } }
E transfer(E e, boolean timed, long nanos){ /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. */
SNode s = null; // constructed/reused as needed // 根据e确定此次转移的模式(是put or take) int mode = (e == null) ? REQUEST : DATA;
for (;;) { //无限循环 SNode h = head; //保存头结点
//相同的操作模式 if (h == null || h.mode == mode) { // 头结点为null或者头结点的模式与此次转移的模式相同 empty or same-mode
if (timed && nanos <= 0) { // 设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作 can't wait if (h != null && h.isCancelled()) // 头结点不为null并且头结点被取消 casHead(h, h.next); // 重新设置头结点(弹出之前的头结点) pop cancelled node else// 头结点为null或者头结点没有被取消 returnnull; } elseif (casHead(h, s = snode(s, e, h, mode))) { // 生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点。入队 //等待匹配操作 SNode m = awaitFulfill(s, timed, nanos); // 空旋或者阻塞直到s结点被FulFill操作所匹配 if (m == s) { // 匹配的结点为s结点(s结点被取消) wait was cancelled clean(s); returnnull; }
//s 还没有离开栈,帮助其离开 if ((h = head) != null && h.next == s) // h重新赋值为head头结点,并且不为null;头结点的next域为s结点,表示有结点插入到s结点之前,完成了匹配 casHead(h, s.next); // 比较并替换head域(移除插入在s之前的结点和s结点) // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); // 根据此次转移的类型返回元素 } } elseif (!isFulfilling(h.mode)) { // 不同的模式,并且没有处于正在匹配状态,则进行匹配 // try to fulfill //节点取消,更新head if (h.isCancelled()) // already cancelled // 比较并替换head域(弹出头结点) casHead(h, h.next); // pop and retry
elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //入队一个新节点,并且处于匹配状态(表示h正在匹配) for (;;) { // loop until matched or waiters disappear // s.next 是真正的操作节点 SNode m = s.next; // m is s's match
// next域为null if (m == null) { // all waiters are gone // 比较并替换head域 casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop }
// m结点的next域 SNode mn = m.next; if (m.tryMatch(s)) { // 尝试匹配,并且成功 // 比较并替换head域(弹出s结点和m结点) casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); // 根据此次转移的类型返回元素 } else// lost match // 没有匹配成功,说明有其他线程已经匹配了,把m移出 s.casNext(m, mn); // help unlink } } } else { // 头结点是真正匹配的状态,那么就帮助它匹配 // help a fulfiller // 保存头结点的next域 SNode m = h.next; // m is h's match if (m == null) // waiter is gone // 比较并替换head域(m被其他结点匹配了,需要弹出h) casHead(h, null); // pop fulfilling node else { // 获取m结点的next域 SNode mn = m.next; if (m.tryMatch(h)) // 帮助匹配 / help match // 比较并替换head域(弹出h和m结点) casHead(h, mn); // pop both h and m else// 匹配不成功 // lost match // 比较并替换next域(移除m结点) h.casNext(m, mn); // help unlink } } } }
SNode awaitFulfill(SNode s, boolean timed, long nanos){ /* * When a node/thread is about to block, it sets its waiter * field and then rechecks state at least one more time * before actually parking, thus covering race vs * fulfiller noticing that waiter is non-null so should be * woken. * * When invoked by nodes that appear at the point of call * to be at the head of the stack, calls to park are * preceded by spins to avoid blocking when producers and * consumers are arriving very close in time. This can * happen enough to bother only on multiprocessors. * * The order of checks for returning out of main loop * reflects fact that interrupts have precedence over * normal returns, which have precedence over * timeouts. (So, on timeout, one last check for match is * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. */ // 根据timed标识计算截止时间 finallong deadline = timed ? System.nanoTime() + nanos : 0L; // 获取当前线程 Thread w = Thread.currentThread(); // 根据s确定空旋等待的时间 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 无限循环,确保操作成功 if (w.isInterrupted()) // 当前线程被中断 // 取消s结点,原理是将节点s的match域设置为this s.tryCancel(); // 获取s结点的match域 SNode m = s.match; if (m != null) // m不为null,存在匹配结点 return m; if (timed) { // 确定继续等待的时间 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 继续等待的时间小于等于0,等待超时 s.tryCancel(); // 取消s结点 continue; } } if (spins > 0) // 空旋等待的时间大于0 spins = shouldSpin(s) ? (spins-1) : 0; // 确定是否还需要继续空旋等待 elseif (s.waiter == null) // 等待线程为null // 设置waiter线程为当前线程 s.waiter = w; // establish waiter so can park next iter elseif (!timed) // 禁用当前线程并设置了阻塞者 LockSupport.park(this); elseif (nanos > spinForTimeoutThreshold) // 继续等待的时间大于阈值 LockSupport.parkNanos(this, nanos); // 禁用当前线程,最多等待指定的等待时间,除非许可可用 } }
/* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. */ //被删除节点的后继 SNode past = s.next;
//后继节点操作被取消,直接移除该节点 if (past != null && past.isCancelled()) // next域不为null并且next域被取消 past = past.next; // 重新设置past
// Absorb cancelled nodes at head SNode p; //如果栈顶是取消了的操作节点,则移除 while ((p = head) != null && p != past && p.isCancelled()) // 从栈顶头结点开始到past结点(不包括),将连续的取消结点移除 casHead(p, p.next);
// Unsplice embedded nodes //因为是单向链表,因此需要从head 开始,遍历到被删除节点的后继 while (p != null && p != past) { // 移除上一步骤没有移除的非连续的取消结点 SNode n = p.next; // 获取p的next域 if (n != null && n.isCancelled()) // n不为null并且n被取消 p.casNext(n, n.next); // 比较并替换next域 else p = n; } }
booleantryMatch(SNode s){ if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } returntrue; } return match == s; }
如果match不为空,则说明当前操作已经被匹配或者取消了,则直接判断 match == s 是否为true;否则CAS将当前节点的match值设置为节点s,如果成功,则通过LockSupport.unpark(w); 唤起线程。