前言
在 ReentrantReadWriteLock源码学习 中,学习了读写锁的源码实现。但是在 读多写少
的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程迟迟无法竞争到锁而一直处于等待状态。StampedLock锁是对ReentrantReadWriteLock锁的一种改进,即StampedLock是一种改进的读写锁
。
StampedLock锁有三种模式:写,读,乐观读。StampedLock锁的状态是由版本和模式两个部分组成
,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问。
使用
1 | public class Point { |
源码解析
内部数据结构
1 | public class StampedLock implements java.io.Serializable { |
由源码可知,StampedLock锁与其他的锁不一样。其他的锁,如:ReentrantLock,都是基于AQS实现的,StampedLock锁并没有实现AQS抽象类。StampedLock锁与AQS一样,内部也有一个state
字段,用来表示锁状态,但是声明为long
型,而非AQS中的int
型。在StampedLock锁中,当处于写模式时stats二进制的第8位为1,读模式时前7位为1-126(附加的readerOverflow用于当读者超过126时)。
构造函数
1 | public StampedLock() { |
在构造StampedLock对象时,会初始化state
的值为:1
2^8
即二进制:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 0000 0000
读锁获取
readLock源码
1 | //悲观读锁,非独占锁,为获得锁一直处于阻塞状态,直到获得锁为止 |
该方法实现了悲观锁的获取。在排队等待队列为空
且没有写锁同时读锁数小于126
且 CAS修改状态成功
,则状态加1并返回,否则自旋获取读锁。
acquireRead源码
1 | private long acquireRead(boolean interruptible, long deadline) { |
在调用readLock方法时,会首先尝试直接CAS改变state(在whead==wtail和(s & ABITS) < RFULL的情况下
),成功的话直接返回stamp(next)。
竞争失败的情况下进入acquireRead的逻辑。acquireRead内部有两个代码比较多的for循环:
第一个for循环
功能:首先在队列为空且没有写锁的情况下,尝试循环获取读锁,直至有写锁时,把当前线程入队,并使当前线程睡眠。
第二个for循环
功能:在第一个for循环中,把当前线程入队时,有两种情况:(1)队列不为空且尾节点为RMODE模式,则把当前线程添加到尾节点的cowait链中(实际上构成一个读线程stack);(2)情况1不满足的情况下,则把当前线程作为尾节点。在情况2发生时,则会跳出第一个for循环,进入第二个for循环。该循环的功能主要是在当前线程组成的node的前驱节点位head时,继续尝试获取锁;在head不为null时,释放cowait链中的节点;在可能的情况下,使当前线程睡眠。
下面详细描述acquireRead的内部逻辑:
- acquireRead方法调用,然后进入第一个for循环。
- 首先取得whead和wtail两个值,假如这两个值不等说明等待队列不为空,那么获取读锁没希望了,会进入等待队列。
- 如果
whead == wtail
,会进入内部的for循环,在读锁未超过RFULL=126时,尽力尝试通过前7bit上递增state来获取锁;如果超过了,并且在没有写锁的情况下(m < WBIT),超出126的部分最终到了readerOverflow中,加入获取了锁就返回stamp。假如m >= WBIT,也就是说m(state前8位)值大于或等于128,那么说明当前锁已经被写者独占,那么我们尝试自旋+随机的方式来探测状态,并且在当前队列和进入循环前一样(说明还没有其他入队者)或者当前队列中已经有了入队者的情况下内层循环跳出,接着肯定会入队。 - 首先根据尾节点为null的情况探测是否初始化队列,使用一个WMODE模式的节点初始化whead和wtail。(因为只有当前锁被写锁独占,才能跳出上个if语句中的for循环,所以会创建一个WMODE模式的节点初始化队列)
- 然后假如当前节点为空则构建当前节点,模式为RMODE,前驱节点为p即尾节点。
- 接着假如当前队列为空即只有一个节点(whead=wtail)或者当前尾节点的模式不是RMODE,那么我们会尝试在尾节点后面添加该节点作为尾节点,然后跳出外层循环;假如当前队列不为空并且当前尾节点模式就是RMODE,那么我们会尝试下一步:添加该节点到尾节点的cowait链(实际上构成一个stack)中。
当前尾节点模式为RMODE模式时逻辑:
- 通过CAS方法将该节点node添加至尾节点的cowait链中,node成为cowait中的顶元素,cowait构成了一个LIFO队列。
- 成功后进入另一个循环,首先尝试unpark头元素(whead)的cowait中的第一个元素,这只是一种辅助作用,因为头元素whead所伴随的那个线程(假如存在)必定是已经获得锁了,假如是读锁那么也必定会释放cowait链。
- 假如当前节点node所在的根节点p的前驱就是whead或者p已经是whead或者p的前驱为null,那么我们会根据state再次积极的尝试获取锁(当m < WBIT)。
- 否则我们探测当前队列是否稳定:
whead == h && p.prev == pp
,在稳定的情况下,假如发现p成为过head或者p已经被取消(status>0),我们尝试node=null,并且跳出当前循环,回到一开的循环里面去尝试获取锁(这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消)。 - 接着我们判断是否为限时版本,以及限时版本所需时间。
- 然后park当前线程以及可能出现的中断情况下取消当前节点的cancelWaiter操作。
我们再来分析跳出循环的另一种情况:队列中无节点或者尾节点模式为WMODE:这样我们的节点必须直接拼接到尾节点后面。下面为第二个for循环逻辑:
- p作为当前节点的前驱节点,假如正好是whead的话,那么会尝试自旋+随机的方式在积极得探测state,从而能够取得锁。并且在获得锁,重置whead和node.prev=null之后释放当前cowait链中的节点。最后返回stamp。
- 否则只需h不为null时尝试释放当前头节点的cowait链,作为一种协作的积极行动。
- 然后在whead==h即队列稳定时,首先会CAS操作当前节点前驱的status,从0变为WAITING从而指示后面有等待的节点。假如发现p的状态已经为取消了,则重新选择node的前驱。
- 前面的这些都处理完成之后,使用类似的park以及cancelWaiter操作。区别在于这里的p.status<0必须保证(因为等待状态WAITING是-1)。
tryIncReaderOverflow源码
1 | private long tryIncReaderOverflow(long s) { |
将state超过 RFULL=126的值放到readerOverflow字段中,state的前七位记录到126之后就会稳定在这个值,偶尔会到127,但是超出126的部分最终到了readerOverflow,加入获取了锁就返回stamp。
cancelWaiter源码
1 | /** |
该方法的功能就是在等待队列中取消当前节点。内部逻辑为:
- 首先设置node的状态为CANCELLED,可以向其他线程传递这个节点是删除了的信息。
- 然后再聚合节点gruop上清理所有状态为CANCELLED的节点(即删除节点)
- 接下来假如当期node节点本身就是聚合节点,那么首先唤醒cowait链中的所有节点(读者),寻找到node后面的第一个非CANCELLED节点,直接拼接到pred上(从而删除当前节点),然后再检查前驱节点状态,假如为CANCELLED则也需要重置前驱节点。
- 最后,在队列中不为空,并且头结点的状态为0即队列中的节点还未设置WAITING信号&当前没有持有写入锁模式&(当前没有锁或者只有乐观锁 | 队列中第一个等待者为读模式),那么就从队列头唤醒一次。
release源码
1 | //释放当前节点, 唤醒继任者节点线程 |
读锁释放
unlockRead源码
1 | public void unlockRead(long stamp) { |
在释放锁时,会传递一个stamp值,进行锁验证,如果验证通过,则直接修改state
中代表读状态的值。在读状态数小于最大记录数时,直接修改state
的值,并唤醒头结点中的线程;在大于时,修改readerOverflow
值。
tryDecReaderOverflow源码
1 | private long tryDecReaderOverflow(long s) { |
写锁获取
1 | //获取写锁,获取失败会一直阻塞,直到获得锁成功 |
在获取写锁时,如果在完全没有锁的情况下(既没有读锁,也没有写锁),会直接尝试CAS获取写锁。获取成功,直接返回;获取失败,则会调用acquireWrite方法。
acquireWrite源码
1 | private long acquireWrite(boolean interruptible, long deadline) { |
acquireWrite方法逻辑实现中,也有两个for循环:
第一个for循环
功能:在无任何锁的情况下,会直接通过CAS操作获取写锁;否则,在持有写锁,并且队列为空的情况下,自旋一段时间获取写锁后,如果还未成功获取到写锁,则将当前线程入队列(在入队列之前,若队列为空,先初始化队列),入队成功后,跳出第一个循环。
第二个for循环 (与acquireRead方法的第二个for循环逻辑差不多)
- 在当前线程入队成功后,如果其前驱结点为头结点,且无任何锁的情况下,则根据头结点自旋次数,循环获取写锁;自旋次数完后,若还未获取锁,则跳出内部循环。
- 如头结点不为空,则释放头结点中cowait链上的节点
- 然后若在whead==h即队列稳定时,首先会CAS操作当前节点前驱的status,从0变为WAITING从而指示后面有等待的节点。假如发现p的状态已经为取消了,则重新选择node的前驱。
- 前面的这些都处理完成之后,使用类似的park以及cancelWaiter操作。区别在于这里的p.status<0必须保证(因为等待状态WAITING是-1)。
释放写锁
1 | public void unlockWrite(long stamp) { |
在释放写锁时,如果stamp校验失败,则抛出异常;否则,释放锁,其实就是将state的第8位置为0
,所以使用state
加上
1 | 2^9 |
即:
0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0010 0000 0000
即可将state的第8位置为0,达到释放锁的效果。因为state
为long型,会发生溢出,64位全部为0,所以如果这种情况就置为ORIGIN。
乐观读锁获取
tryOptimisticRead源码
1 | public long tryOptimisticRead() { |
在获取乐观锁的时候,如果没有写锁,则返回固定值256
,否则返回0。返回0表示获取锁失败
。
validate源码
1 | public boolean validate(long stamp) { |
在成功获取乐观锁并读取所需数据后,需要调用validate方法校验在读取期间是否发生了其他写锁(因为与SBITS进行&操作时,只会检查8-64位,所以(stamp & SBITS) == (state & SBITS)
时,说明期间没有发生新的写锁)。
unlock源码
1 | public void unlock(long stamp) { |
该方法释放的锁,既可以是读锁,也可以是写锁。首先,在stamp校验不通过时,直接抛异常;通过时
- 若
state
为初始态,即没有锁时,抛异常; - 为写锁时,当
a != m
时,抛异常;否则释放写锁,并唤醒后续节点。 - 若为读锁,并
m < RFULL
,释放读锁并唤醒后续节点;否则减少overflow
tryConvertToWriteLock源码
1 | /** |
该方法将读锁转为写锁。
tryConvertToReadLock源码
1 | /** |
该方法将写锁转为读锁。
- 在没有任何锁的时候,如果
m < RFULL
,则直接CAS获取读锁,否则增加readerOverflow
。 - 在持有写锁时,则释放写锁持有读锁(
state = s + (WBIT + RUNIT)
) - 在持有读锁时,直接返回
stamp
。