DelayQueue源码学习

前言

DelayQueue是一个支持延迟获取元素的无界阻塞队列。队列内部使用priorityQueue实现相应操作。存储的元素必须要继承 Delayed接口。PriorityQueue 和PriorityBlockingQueue队列一样,都是一种优先级的队列,内部实现原理也是使用的二叉堆。

数据结构

1
2
3
4
5
6
7
8
9
10
11
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
//存储元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//等待队列头部元素的指定线程
private Thread leader = null;
//条件控制,表示是否可以从队列中取数据
private final Condition available = lock.newCondition();
}

由DelayQueue类的内部数据结构可知,其内部存储的元素要实现 Delayed接口。在DelayQueue类中,还维持了一个 PriorityQueue类 的对象,用于实现DelayQueue队列的相应操作。

Delayed接口的数据结构可以看出,它继承了 Comparable接口,这为优先级队列提供了一种排序机制。Delayed接口内部还有一个 getDelay() 方法,用于返回剩余的延迟时间:零值或负值表示这个延迟时间已经过去了。DelayQueue队列就是根据这个条件来控制延时获取元素的。

Delayed接口

Read More

PriorityBlockingQueue源码学习

前言

PropertyBlockingQueue 是基于数组实现的线程安全的无界队列,它是基于优先级,而不是FIFO队列。默认情况下元素采取自然顺序升序排列,也可以自定义类实现compareTo()方法来指定元素排序规则,需要注意的是不能保证同优先级元素的顺序。虽说是无界的,但有可能会导致OutOfMemoryError异常。

存储结构

image

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

//队列默认大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//队列最大容量,超过该容量抛OutOfMemoryError异常
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//优先级队列表示为平衡二叉堆。queue[0]为元素最小值。PriorityBlockingQueue 底层还是通过数组来实现的
private transient Object[] queue;
//队列元素个数 不序列化
private transient int size;
//比较器。在为null时使用元素的自然排序
private transient Comparator<? super E> comparator;
//用于队列操作的锁
private final ReentrantLock lock;
//队列非空条件
private final Condition notEmpty;
//队列扩容的 "锁" 不序列化
private transient volatile int allocationSpinLock;
//队列扩容的 "锁" 不序列化
private transient volatile int allocationSpinLock;
}

PriorityBlockingQueue 与ArrayBlockingQueue,LinkedBlockingQueue阻塞队列不同,它的内部只维持了一个notEmpty条件,没有notFull 条件,也就是说PriorityBlockingQueue 没有队满的概念。当队列满的时候,进行扩容,当达到最大容量时,会抛出OutOfMemoryError异常。与ArrayBlockingQueue一致的是都是维持了一个锁来控制出入队行为,且底层也是通过数组实现。而LinkedBlockingQueue是通过两个锁来进行控制。

构造函数

Read More

LinkedBlockingQueue源码学习

前言

在上篇文章 ArrayBlockingQueue源码学习 中,已对底层基于数组的ArrayBlockingQueue阻塞队列进行了解析。本文将讲解阻塞队列的另外一个重要成员:LinkedBlockingQueueLinkedBlockingQueue的底层是基于链表的阻塞队列。

它既可以充当有界队列,也可以充当无界队列。在手动设置了队列的大小时,它即为有界队列;在没有设置队列大小时,默认大小为Integer.MAX_VALUE,可以冲当成无界队列。

固定大小线程池底层所使用的阻塞队列就是LinkedBlockingQueue队列。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

源码解析

Read More

ArrayBlockingQueue源码学习

前言

ArrayBlockingQueue是一个
基于数组且有界的阻塞队列 。此队列按 FIFO(先进先出)原则对元素(元素不允许为null)进行排序。此队列一经创建,其容量不能再改变。

源码解析

内部数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
//内部数据结构
final Object[] items;

/** items index for next take, poll, peek or remove */
//下次从数组中取元素的索引
int takeIndex;

/** items index for next put, offer, or add */
//下次往数组中添加元素的索引
int putIndex;

/** Number of elements in the queue */
//队列中的元素数
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

}

从ArrayBlockingQueue内部数据结构中,可以看出:该并发集合其底层是使用了 ReentrantLock 和Condition来完成并发控制的。内部是基于数组。

构造函数

Read More

CopyOnWriteArrayList源码学习

前言

CopyOnWriteArrayList是一个线程安全的,读操作无锁,但是写操作有锁 的ArrayList。是 读写分离 思想的体现。从名字中,我们可以推断出, 这个是在 Write 时进行 Copy 数组元素的 ArrayList。是一种 空间换时间 的策略,适合读多写少的场景。

源码解析

内部数据结构
1
2
3
4
5
6
7
8
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
//底层数据结构。并被volatile修饰
private transient volatile Object[] array;
}

这两个成员变量是该类的核心,对集合所有的修改操作都需要使用lock加锁,array则是整个集合的数据储存部分,关键在于该array被声明为volatile,保证了内存可见性。

构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//默认创建一个元素数为0的数组,ArrayList则默认为10
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}

public CopyOnWriteArrayList(E[] toCopyIn) {
setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

CopyOnWriteArrayList类提供了三个构造方法,值得一提的是无参构造方法,在无参构造方法中,会默认创建一个元素数为0的数组,而ArrayList则默认为10。其他两个构造函数会将传进来的集合通过Arrays.copyOf()方法转换成一个Object类型的数组,并用array成员变量存储起来。

add源码

Read More

ConcurrentLinkedQueue源码学习

前言

ConcurerntLinkedQueue
一个 基于链接节点的无界线程安全的FIFO队列。和其他大部分并发集合类似,此队列不允许使用null元素。

ConcurerntLinkedQueue数据结构

源码解析

内部数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {

//内部节点类
private static class Node<E> {
volatile E item; //表示元素
volatile Node<E> next; //表示下一个结点
}

private transient volatile Node<E> head; //头结点
private transient volatile Node<E> tail; //尾节点

//遍历器类
private class Itr implements Iterator<E> {
/**
* Next node to return item for.
*/
private Node<E> nextNode;

/**
* nextItem holds on to item fields because once we claim
* that an element exists in hasNext(), we must return it in
* the following next() call even if it was in the process of
* being removed when hasNext() was called.
*/
private E nextItem;

/**
* Node of the last returned item, to support remove.
*/
private Node<E> lastRet;
}
}

从源码可知,ConcurrentLinkedQueue 继承了AbstractQueue抽象类并实现了QueueSerializable 接口。其内部数据结构有:

Read More

ConcurrentHashMap源码学习

前言

HashMap是我们开发中使用最频繁的集合,但是它是非线程安全的,在涉及到多线程并发的情况时,put操作有可能会引起死循环。

解决方案有HashTableCollections.synchronizedMap(hashMap),它们是线程安全的,它们的实现原理是在所有涉及到多线程操作的都加上了synchronized关键字来锁住整个table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的。相对于粗暴的锁住整个table的做法,ConcurrentHashMap则使用了一种 锁分段技术,实现并发的更新操作。底层采用 Node数组+链表+红黑树 的存储结构。

image

源码解析

Read More

Mysql根据指定数据确定分页

场景

某业务,只知道每页要展示数据大小和指定数据,然后确定指定数据在第几页。

测试表user
id name age update_time create_time
1 hou 32 2017-11-01 12:00:00 2017-11-01 12:00:00
2 hou1 22 2017-11-01 12:00:00 2017-11-11 12:00:00
3 hou2 29 2017-09-01 12:00:00 2017-11-01 12:00:00
4 hou3 24 2017-05-01 12:00:00 2017-11-02 12:00:00
5 hou4 25 2017-12-01 12:00:00 2017-08-01 12:00:00
6 hou5 26 2017-11-01 04:02:00 2017-03-01 12:00:00
7 hou6 12 2017-11-01 12:23:00 2017-10-01 12:00:00
8 hou7 28 2017-10-01 12:00:00 2017-11-01 12:00:00
需求

现在只知道每页要显示3条数据,即pageSize = 3。确认 id = 8 的数据在第几页。

Read More

java并发学习之StampedLock学习

前言

    在 ReentrantReadWriteLock源码学习 中,学习了读写锁的源码实现。但是在 读多写少 的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程迟迟无法竞争到锁而一直处于等待状态。StampedLock锁是对ReentrantReadWriteLock锁的一种改进,即StampedLock是一种改进的读写锁

     StampedLock锁有三种模式:写,读,乐观读。StampedLock锁的状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问。

使用

Read More

ReentrantReadWriteLock源码学习

前言

在学习 ReentrantLock锁 的时候,我们知道,ReentrantLock实现了标准的互斥重入锁,任一时刻只有一个线程能获得锁。而现实编程中,读操作是比较普遍的,写操作则相对很少发生。如果使用互斥锁,那么即使都是读操作,也只有一个线程能获得锁,其他的读都得阻塞。这样显然不利于提供系统的并发量。本文即将进行讲解的 读写锁ReentrantReadWriteLock 就是为了解决这种读多写少情况。在读-写锁的实现加锁策略中,允许多个读操作同时进行,但每次只允许一个写操作。

使用

Read More

Fork me on GitHub