前言 ArrayBlockingQueue是一个基于数组且有界的阻塞队列
。此队列按 FIFO(先进先出)原则对元素(元素不允许为null)进行排序。此队列一经创建,其容量不能再改变。
源码解析 内部数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ArrayBlockingQueue <E > extends AbstractQueue <E > implements BlockingQueue <E >, java .io .Serializable { final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; }
从ArrayBlockingQueue内部数据结构中,可以看出:该并发集合其底层是使用了 ReentrantLock 和Condition来完成并发控制的。内部是基于数组。
构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue (int capacity, boolean fair, Collection<? extends E> c) { this (capacity, fair); final ReentrantLock lock = this .lock; lock.lock(); try { int i = 0 ; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
从源码中可知,ArrayBlockingQueue类有三个构造函数,其中,都是基于 ArrayBlockingQueue(int capacity, boolean fair)
构造函数的。该构造函数主要是初始化了内部数组,锁及Condition两个对象。
元素添加源码 add源码 1 2 3 4 public boolean add (E e) { return super .add(e); }
由源码可知,add方法内部调用了父类AbstractQueue的add方法,父类AbstractQueue的add方法为一个模板方法,最终实现元素添加是在offer方法中。
AbstractQueue的add源码 1 2 3 4 5 6 public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException("Queue full" ); }
由源码可知,在添加元素成功后,返回true,否则,抛出异常。
offer方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } }
代码逻辑:
元素为空判断。为空,抛出异常;否则,执行第2步
获取可重入锁对象。获取锁失败,等待;获取成功,执行第3步
如果队列已满,返回FALSE,否则,调用enqueue方法进行入队操作,并返回true
释放锁
enqueue源码 1 2 3 4 5 6 7 8 9 10 private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
代码逻辑:
获取当前数组
将元素添加进数组中
增加索引putIndex
,如果队列满,则重置putIndex=0
增加count
值
唤醒通知
put源码 1 2 3 4 5 6 7 8 9 10 11 12 13 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
代码逻辑:
元素为空判断。为空,抛出异常;否则,执行第2步
获取可重入可中断锁对象。获取锁失败,等待;获取成功,执行第3步
while循环判断队列是否已满。已满,等待;不满,调用enqueue方法进行入队操作
释放锁
备注:元素入队有add与put两种方法。二者的不同之处在于:add方法在添加元素时,如果队列已满,则抛出异常;put方法在添加元素时,如果队列已满,则是无限等待。
超时offer源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0 ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } }
该方法与重载方法 offer(E e)
的不同之处在于:该方法提供了超时等待机制,在等待超时时,返回FALSE。
模拟put操作
队列初始态
插入元素10
获取元素 poll源码 1 2 3 4 5 6 7 8 9 10 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } }
由源码可知,在出队列时,会首先获取锁,在获取锁成功后,如果队列为空,返回null;否则,调用dequeue方法进行出队操作。
dequeue源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private E dequeue () { final Object[] items = this .items; @SuppressWarnings ("unchecked" ) E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
出队dequeue方法与入队enqueue的逻辑大致一致。在出队时,首先获取索引takeIndex处的元素并将takeIndex处的值置为空,然后修改takeIndex的值,并进行通知。
可超时poll源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
该方法提供了一种超时等待机制,在队列为空的情况下,如果等待超时,则直接返回null。
take源码 1 2 3 4 5 6 7 8 9 10 11 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
该方法与poll方法不同之处在于:在队列没有元素的情况下,poll方法直接返回null;而take方法则阻塞,直到队列有元素为止。
peek源码 1 2 3 4 5 6 7 8 9 10 11 12 13 public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt (int i) { return (E) items[i]; }
方法很简单,不在详述。
模拟take操作
首次take操作
remove源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean remove (Object o) { if (o == null ) return false ; final Object[] items = this .items; final ReentrantLock lock = this .lock; lock.lock(); try { if (count > 0 ) { final int putIndex = this .putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true ; } if (++i == items.length) i = 0 ; } while (i != putIndex); } return false ; } finally { lock.unlock(); } }
代码逻辑:
待移除元素为空判断,为空,返回FALSE
获取锁
队列不为空,执行第4步;为空,返回FALSE
从索引 takeIndex
处遍历内部数组,判断与待移除元素是否相等,相等,调用removeAt方法移除指定位置处的元素。否则,返回FALSE
removeAt源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void removeAt (final int removeIndex) { final Object[] items = this .items; if (removeIndex == takeIndex) { items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); } else { final int putIndex = this .putIndex; for (int i = removeIndex;;) { int next = i + 1 ; if (next == items.length) next = 0 ; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null ; this .putIndex = i; break ; } } count--; if (itrs != null ) itrs.removedAt(removeIndex); } notFull.signal(); }
代码逻辑:
获取原数组
如果待删除元素索引 removeIndex
等于 takeIndex
,则直接将takeIndex
的元素置为空,并修改takeIndex
值即可。否则,执行第3步
从待删除元素索引 removeIndex
开始,将其后的元素逐一向前移动,并把最末尾的元素置为空。
唤醒通知。
模拟remove操作
<a data-flickr-embed="true" href="https://www.flickr.com/photos/157389715@N05/37567430565/in/dateposted-public/" title="未命名文件"><img src="https://farm5.staticflickr.com/4523/37567430565_f271740798_z.jpg" width="524" height="395" alt="未命名文件"></a>
调用remove方法,移除元素23
首次for循环
二次for循环
三次for循环
至此,remove操作完成。
contains源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean contains (Object o) { if (o == null ) return false ; final Object[] items = this .items; final ReentrantLock lock = this .lock; lock.lock(); try { if (count > 0 ) { final int putIndex = this .putIndex; int i = takeIndex; do { if (o.equals(items[i])) return true ; if (++i == items.length) i = 0 ; } while (i != putIndex); } return false ; } finally { lock.unlock(); } }
很简单,遍历数组,逐个比较。
参考文章 BlockingQueue之ArrayBlockingQueue
ArrayBlockingQueue源码分析