//是否需要将堆进行有序化 boolean heapify = true; // true if not known to be in heap order //扫描null 值,保证队列中不会有null 元素 boolean screen = true; // true if must screen for nulls //如果必须屏蔽空值,返回true
//如果已经是本身类结构,那么也无需再次堆有序化 if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; }
Object[] a = c.toArray(); //获取集合的数组形式 int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. //拷贝元素 if (a.getClass() != Object[].class) //如果c.toArray不是Object数组类型,则进行转换 a = Arrays.copyOf(a, n, Object[].class);
//扫描集合,不允许出现null if (screen && (n == 1 || this.comparator != null)) { //如果需要屏蔽空值,则遍历判断是否有空值,有,抛出异常 for (int i = 0; i < n; ++i) if (a[i] == null) thrownew NullPointerException(); }
//设置全局queue与队列大小 this.queue = a; this.size = n;
//如果不知道在堆的顺序 if (heapify) heapify(); //堆有序化 }
PriorityBlockingQueue提供了四个构造函数,但都是基于PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)的,该构造函数主要是初始化一些内部属性。
源码解析
heapify源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//将这个数组进行 堆化 privatevoidheapify(){ Object[] array = queue; int n = size; int half = (n >>> 1) - 1; //非叶子节点并且编号最大的节点 Comparator<? super E> cmp = comparator; // 获取 比较器, 若这里的 comparator是空, 则用元素自己实现的比较接口进行比较 if (cmp == null) { for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } }
//只有在持有锁的情况下才被调用。扩容 privatevoidtryGrow(Object[] array, int oldCap){ lock.unlock(); //释放锁 must release and then re-acquire main lock Object[] newArray = null;
//只有在持有锁的情况下才被调用。总体逻辑为取出堆顶元素后,将堆最后一个元素放到堆顶位置,然后通过下沉操作,将最小元素重新放在堆顶 private E dequeue(){ int n = size - 1; if (n < 0) //队列为空,返回null returnnull; else { Object[] array = queue; //堆顶元素 E result = (E) array[0];
// 在位置k上插入元素x。通过下沉元素x直到它小于或者等于它的子节点或者是叶子节点来维持堆一致性 privatestatic <T> voidsiftDownComparable(int k, T x, Object[] array, int n){ if (n > 0) { //元素自身默认比较器 Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf while (k < half) { // 2*k+1 表示的k的左孩子的位置 int child = (k << 1) + 1; // assume left child is least //获取左叶子节点元素 Object c = array[child];
//获取k的右孩子的位置 int right = child + 1;
//取左右孩子中元素值较小的值(这里的较小,是通过比较器来定义的较小) if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right];
//x 比左右孩子都小,那么不用继续下沉了 if (key.compareTo((T) c) <= 0) break;
//同siftDownComparable方法 privatestatic <T> voidsiftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp){ if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
下沉流程
take源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//会阻塞 public E take()throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //如果队列为空,则阻塞在notEmpty条件上 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
该方法与poll方法不同之处在于:当队列为空的时候,出队操作会阻塞。
超时poll源码
1 2 3 4 5 6 7 8 9 10 11 12 13
public E poll(long timeout, TimeUnit unit)throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //可中断获取锁 E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; }
与take方法不同之处在于:提供了一种超时等待机制,不会无限等待下去。
peek源码
1 2 3 4 5 6 7 8 9 10
//返回队头元素,但是元素并不出队 public E peek(){ final ReentrantLock lock = this.lock; lock.lock(); try { return (size == 0) ? null : (E) queue[0]; } finally { lock.unlock(); } }
该方法也是获取队列头元素,不同之处是不会有出队逻辑。
remove源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicbooleanremove(Object o){ final ReentrantLock lock = this.lock; lock.lock(); try { //获取元素o在队列的位置 int i = indexOf(o); if (i == -1) returnfalse; removeAt(i); returntrue; } finally { lock.unlock(); } }