前言 DelayQueue是一个支持延迟获取元素的无界阻塞队列。 队列内部使用priorityQueue实现相应操作。存储的元素必须要继承 Delayed接口
。PriorityQueue 和PriorityBlockingQueue队列一样,都是一种优先级的队列,内部实现原理也是使用的二叉堆。
数据结构 1 2 3 4 5 6 7 8 9 10 11 public class DelayQueue <E extends Delayed > extends AbstractQueue <E > implements BlockingQueue <E > { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null ; private final Condition available = lock.newCondition(); }
由DelayQueue类的内部数据结构可知,其内部存储的元素要实现 Delayed接口
。在DelayQueue类中,还维持了一个 PriorityQueue类
的对象,用于实现DelayQueue队列的相应操作。
从Delayed接口
的数据结构可以看出,它继承了 Comparable接口
,这为优先级队列提供了一种排序机制。Delayed接口
内部还有一个 getDelay()
方法,用于返回剩余的延迟时间:零值或负值表示这个延迟时间已经过去了
。DelayQueue队列就是根据这个条件来控制延时获取元素的。
Delayed接口
1 2 3 4 5 6 7 8 9 10 11 12 public interface Delayed extends Comparable <Delayed > { long getDelay (TimeUnit unit) ; }
构造函数 1 2 3 4 5 public DelayQueue () {}public DelayQueue (Collection<? extends E> c) { this .addAll(c); }
DelayQueue 内部组合PriorityQueue,对元素的操作都是通过PriorityQueue 来实现的,DelayQueue 的构造方法很简单,对于PriorityQueue 都是使用的默认参数,不能通过DelayQueue 来指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要实现Comparable ,因此不需要指定的Comparator。
源码解析 入队 add源码 1 2 3 public boolean add (E e) { return offer(e); }
offer源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } }
代码逻辑:
获取锁。成功,执行第2步;否则,循环等待
通过PriorityQueue对象将元素入队。
在入队成功后,如果队列头元素与被插入元素相同,则available唤醒,并将leader置空
释放锁
put源码 1 2 3 public void put (E e) { offer(e); }
超时offer源码 1 2 3 public boolean offer (E e, long timeout, TimeUnit unit) { return offer(e); }
与PriorityBlockingQueue队列一样,由于是无界队列,所以没有元素满的情况,但实际当到达队列最大值后,就抛oom异常。所以put和超时offer方法都不会阻塞或者返回FALSE。
出队 poll源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0 ) return null ; else return q.poll(); } finally { lock.unlock(); } }
代码逻辑:
获取锁。成功,执行第2步;否则,循环获取锁
获取队列头元素。
如果队列为空或者延时还没到,则返回null;否则,执行第4步
元素出队
释放锁
take源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
代码逻辑:
获取可中断锁
进入无限循环。
获取队列头元素
如果头元素为null,available
等待;否则执行第下步
获取头元素的延迟时间。如果延迟时间到期,返回头元素;否则,执行第下步
如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待;否则,执行第下步
获取当前线程,并设置leader
,然后等待
如果没有线程在等待获取元素并且队列头元素不为null,则 available
唤醒,并释放锁
出队方法 take()
与 poll()
方法不同之处在于:在队列为空或者延迟未到,poll()
是直接返回null;而 take()
则是阻塞等待。
超时poll 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) { if (nanos <= 0 ) return null ; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); if (nanos <= 0 ) return null ; first = null ; if (nanos < delay || leader != null ) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
超时 poll()
方法的大致逻辑与 take()
方法一致。只是有细微区别。
DelayQueue队列的其他方法都是直接使用PriorityQueue来进行操作的。没什么好说的了
参考文章 Java 并发 — 阻塞队列之DelayQueue源码分析
java 之DelayQueue实际运用示例