publicclassLinkedBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable{ //链接列表节点类。所有的元素都通过Node这个静态内部类来进行存储,这与LinkedList的处理方式完全一样 staticclassNode<E> { //使用item来保存元素本身 E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ //保存当前节点的后继节点 Node<E> next;
publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); //参数校验 this.capacity = capacity; //初始化链表大小 last = head = new Node<E>(null); //初始化链表头尾节点,指向一个dummy节点 }
publicLinkedBlockingQueue(Collection<? extends E> c){ this(Integer.MAX_VALUE); //调用重载构造函数,进行基础属性的初始化 final ReentrantLock putLock = this.putLock; //获取锁 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) thrownew NullPointerException(); if (n == capacity) thrownew IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
publicvoidput(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); //待插入元素为空判断,为空, 抛异常 // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); //将待插入元素组装成节点 final ReentrantLock putLock = this.putLock; //获取锁 final AtomicInteger count = this.count; //获取元素个数 /* 执行可中断的锁获取操作,即意味着如果线程由于获取 锁而处于Blocked状态时,线程是可以被中断而不再继 续等待,这也是一种避免死锁的一种方式,不会因为 发现到死锁之后而由于无法中断线程最终只能重启应用。 */ putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //如果链表已满,等待。使用while判断依旧是为了放置线程被"伪唤醒”而出现的情况,即当线程被唤醒时而队列的大小依旧等于capacity时,线程应该继续等待。 while (count.get() == capacity) { notFull.await(); } enqueue(node); //入链表 c = count.getAndIncrement(); //获取链表当前数量 if (c + 1 < capacity) //链表没有满,唤醒通知。c+1得到的结果是新元素入队列之后队列元素的总和。 notFull.signal(); } finally { putLock.unlock(); } /* 当c=0时,即意味着之前的队列是空队列,出队列的线程都处于等待状态, 现在新添加了一个新的元素,即队列不再为空,因此它会唤醒正在等待获取元素的线程。 */ if (c == 0) //c == 0,表明是第一次往链表中添加数据 signalNotEmpty(); }
public E poll(){ final AtomicInteger count = this.count; if (count.get() == 0) returnnull; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //获取锁 try { if (count.get() > 0) { //链表不为空,从链表中取出元素 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
poll源码很简单,在队列为空时,直接返回null,在不为空时,获取头节点元素并返回。
remove源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
publicbooleanremove(Object o){ if (o == null) returnfalse; //如果待移除元素为null,直接返回FALSE fullyLock(); //获取putLock和takeLock两个锁 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //从头结点开始比那里链表 if (o.equals(p.item)) { //如果遍历到待删除元素,则调用unlink方法,去除指定结点 unlink(p, trail); returntrue; } } returnfalse; } finally { fullyUnlock(); } }
voidunlink(Node<E> p, Node<E> trail){ // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); }
该方法的功能是将节点P从前继节点trail中移除。即将trail的next节点指向P的next节点。从代码里的注释: p.next is not changed, to allow iterators that are traversing p to maintain their weak-consistency guarantee. 可知,这种移除是一种弱移除。
contains源码
1 2 3 4 5 6 7 8 9 10 11 12
publicbooleancontains(Object o){ if (o == null) returnfalse; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) //从头结点开始遍历链表,判断元素o是否在链表中 if (o.equals(p.item)) returntrue; returnfalse; } finally { fullyUnlock(); } }