AQS抽象的队列同步器
概念
是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个volatile的int类型变量state表示持有锁的状态。
抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock…
锁,面向锁的使用者
定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。
同步器,面向锁的实现者
也就是通过同步器,我们可以实现各种不同的锁。
为什么需要AQS
加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要某种形式的队列来进行管理。
抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS,自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
AQS
- 同步状态State成员变量
- CLH变体的虚拟双向队列
State
- 0就是没人,自由状态可以办理。
- 大于等于1,有人占用窗口,需要排队
总结
- 锁会存在阻塞,有阻塞就需要排队,实现排队必然需要队列
- AQS:state变量+CLH变种的双向队列
框架
首先,我们通过下面的架构图来整体了解一下AQS框架:
- 上图中有颜色的为Method,无颜色的为Attribution。
- 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
数据结构
先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。
1 | static final class Node { |
解释一下几个方法和属性值的含义:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出空指针异常 |
nextWaiter | 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍) |
next | 后继指针 |
线程两种锁的模式:
模式 | 含义 |
---|---|
SHARED | 表示线程以共享的模式等待锁 |
EXCLUSIVE | 表示线程正在以独占的方式等待锁 |
waitStatus有下面几个枚举值:
枚举 | 含义 |
---|---|
0 | 当一个Node被初始化的时候的默认值 |
CANCELLED | 为1,表示线程获取锁的请求已经取消了 |
CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 |
SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
同步状态State
在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
下面提供了几个访问这个字段的方法:
方法名 | 描述 |
---|---|
protected final int getState() | 获取State的值 |
protected final void setState(int newState) | 设置State的值 |
protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State |
这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。
AQS重要方法与ReentrantLock的关联
从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。
为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。
加锁:
- 通过ReentrantLock的加锁方法Lock进行加锁操作。
- 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
- AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
- tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
- 通过ReentrantLock的解锁方法Unlock进行解锁。
- Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
- Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。
通过ReentrantLock理解AQS
ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。
在非公平锁中,有一段这样的代码:
1 | /** |
看一下这个Acquire是怎么写的:
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
Acquire中调用了tryAcquire()
方法
1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer |
可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
非公平锁和公平锁的tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁少了一个判断!hasQueuedPredecessors()
hasQueuedPredecessors()
中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。
1 |
|
假设有三个人去银行办理业务,第一个的A前面没有人,首先办理业务。调用lock()
方法
1 | final void lock() { |
首先会使用compareAndSetState(0, 1)
方法进行判断
1 | protected final boolean compareAndSetState(int expect, int update) { |
由于目前A是第一个办理业务的,也就是第一个获取锁人。此时state值还是为默认值0,通过CAS操作将其成功更新为1
。返回true
。进入到If判断中
调用setExclusiveOwnerThread(Thread.currentThread());
1 |
|
将当前线程设置为独占访问权限的线程,也就是当前线程持有了这把锁。
到此lock方法就结束了,A线程暂停2秒钟。B线程开始运行调用lock方法。
此时state
已经被设置为了1
1 | final void lock() { |
所以不满足判断条件。则运行 acquire(1)
1 | public final void acquire(int arg) { |
首先调用!tryAcquire(1)
1 | protected final boolean tryAcquire(int acquires) { |
非公平锁调用nonfairTryAcquire(1)
1 | final boolean nonfairTryAcquire(int acquires) { |
- 第一个if条件
c==0
,此时为1不满足 - 第二个if条件
current == getExclusiveOwnerThread()
当前访问独占权限的线程是A,不是B。也不满足 - 所以直接返回false
第一个if条件c == 0什么时候满足呢?
1 | if (c == 0) { |
其实就是当B线程运行tryAcquire()
方法时,A线程刚好完成,释放了锁,将state
设置为了0
。满足条件,CAS将state设置为1
,设置当前访问独占权限的线程为B。返回true
。!nonfairTryAcquire(1)
为false不满足条件acquire(1)
方法结束。lock()
成功!
第二个if条件current == getExclusiveOwnerThread()什么时候满足呢?
1 | else if (current == getExclusiveOwnerThread()) { |
当此时还是A线程,也就是前访问独占权限的线程尝试获取锁时满足条件。
此时
nextc = c + acquires
c=state=1
acquires=1
nextc=2
setState(nextc);
设置state为2
将state设置为2
,返回true
。这也就是为什么ReentrantLock
为可重入锁
。!nonfairTryAcquire(1)
为false不满足条件acquire(1)
方法结束。lock()
成功!
我们接着当前的正常逻辑,回到acquire(1)
方法
tryAcquire(1)
返回false
,取否变成true
所以接着运行acquireQueued(addWaiter(Node.EXCLUSIVE), 1))
首先查看addWaiter(Node.EXCLUSIVE)
加入等待队列中
static final Node EXCLUSIVE = null;
1 | private Node addWaiter(Node mode) { |
1 | Node(Thread thread, Node mode) { // Used by addWaiter |
主要的流程如下:
- 通过当前的线程和锁模式新建一个节点。
- Pred指针指向尾节点Tail。
- 将New中Node的Prev指针指向Pred。
- 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
当前模式为独占模式值mode
值为null
如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
由于当前等待队列中没有线程进行排队,tail
为null
所以pred=null
不满足if条件,运行enq(node)
将节点入队
1 | private Node enq(final Node node) { |
Node t = tail
tail为null
,满足if条件,调用compareAndSetHead(new Node())
CAS设置队列头节点。
1 | private final boolean compareAndSetHead(Node update) { |
如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
此时头节点head为null
,创建一个新节点Node设置为头节点,成功返回true
将tail = head;
头节点赋值给尾节点。第一个节点头节点,是创建的哨兵节点(虚拟头节点)。
由于for (;;)
所以接着循环此时t==null
不成立,node.prev = t;
将B的前驱指向尾节点(此时尾节点)。
compareAndSetTail(t, node)
接着通过CAS操作将尾节点设置为了node
也就是B。接着将t.next=node
将其后继结点指向node即B。
然后结束循环,回到addWaiter()
,运行return node
也就是将B节点返回出去。此时回到acquire(1)
中,执行acquireQueued(B,1))
1 | final boolean acquireQueued(final Node node, int arg) { |
进入循环final Node p = node.predecessor();
1 | /** |
返回当前节点的前节点,也就是B的前节点,即head
头节点。
1 | if (p == head && tryAcquire(arg)) { |
此时p==head
满足,再次tryAcquire(arg)
尝试获取锁,获取成功的话
则setHead(node);
将B设置为新的头节点。而以前的头节点p.next
将其后节点指针设置为null(便于GC)返回interrupted
false。结束。
1 | private void setHead(Node node) { |
如果不满足p == head && tryAcquire(arg)
则进入下一个判断
1 | if (shouldParkAfterFailedAcquire(p, node) && |
shouldParkAfterFailedAcquire(p, node)
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
int ws = pred.waitStatus;
此时pred代表头节点,ws值为初始值0
1 | else { |
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
此时将前节点pred也就是头节点的waitStatus
值CAS操作成了Node.SIGNAL
也就是-1
接着返回false
。回到acquireQueued()
方法,并没有结束循环,再次循环运行
接着再度运行shouldParkAfterFailedAcquire(p, node)
,此时
头节点pred
的waitStatus== Node.SIGNAL
进入return true
就能运行parkAndCheckInterrupt())
1 | /** |
调用改方法,运行LockSupport.park(this);
暂停该线程。返回Thread.interrupted();
,线程暂停成功则返回true
。线程B暂停,等待唤醒…….(方法并没有结束,只是暂停了,唤醒之后继续运行)
此时A线程终于unlock()
1 | public void unlock() { |
1 | public final boolean release(int arg) { |
进入tryRelease(1)
1 | protected final boolean tryRelease(int releases) { |
此时getState()
state的值为1
int c = getState() - releases;
等于1-1为0
if (Thread.currentThread() != getExclusiveOwnerThread())
如果当前解锁线程不是锁的持有者,则抛出IllegalMonitorStateException
异常。
如果c==0
则进判断
- 设置
free=true
setExclusiveOwnerThread(null);
独占线程(锁拥有者)为nullsetState(c);
变为0- 返回free,也就是true
如果c!=0
在nonfairTryAcquire()
中嵌入式锁中给该值进行过,加值操作。此时只释放了一次,假设进行了两次lock
,则设置新的state值,也就是1
,返回false
。结束。锁还没有释放,需要该线程再度unlock()
锁的嵌入式情况,此时lockMethod2()的unlock
解锁了一次,之后state变成了1,还需要lockMethod2()的unlock,才能真正释放锁。
回到主流程,A线程unlock释放锁,tryRelease
成功返回true,state变成了0。
1 | if (tryRelease(arg)) { |
此时h != null && h.waitStatus != 0
- 头节点h不等于null,为哨兵节点。
- waitStatus在前面B节点等待的时候使用
compareAndSetWaitStatus()
变成了-1
进入判断调用unparkSuccessor(h);
1 | /** |
int ws = node.waitStatus;
头节点的waitStatus为-1
小于0,调用
compareAndSetWaitStatus(node, ws, 0);
将其又变成了0
接着Node s = node.next
头节点下一个节点也就是B,if (s == null || s.waitStatus > 0)
- B节点不等于null
- B节点的waitStatus等于0,不满足大于0
if (s != null)
满足后,调用LockSupport.unpark(s.thread);
此时A的unlock()
也就执行完成了。B线程被唤醒。
此时B线程被唤醒继续执行代码将interrupted = true;
设置为true,接着继续遍历循环,final Node p = node.predecessor();
B的前驱节点p为head,执行tryAcquire(arg)
此时由于没用了A线程持有锁,B线程获取到了锁,满足条件,调用setHead(node)
方法,将B节点的属性赋值给头节点(B此时也就是new Node())
1 | private void setHead(Node node) { |
其实也就是搞了一个新的哨兵节点(watiStatus=0,thread=0,next则没用改变,如果还有C的话,还是指向C的),将原先指向头节点的前指针为null,
接着p.next = null;
原先的头节点再也没用任何指针指向,会被GC掉。
failed=false
返回interrupted也就是ture
接着if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
满足判断
调用selfInterrupt();
1 | static void selfInterrupt() { |
其实B线程唤醒后,应该是从这里再继续执行。
1 | public static void park(Object blocker) { |
setBlocker(t, null);
设置blocket为null
所以Interruptible b = blocker;
b为null不满足条件,没有中断线程B,只是interrupt0();
设置 interrupt flag
为false。到此结束。
但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
- 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
- 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的可以看下ThreadPoolExecutor源码。
JUC中的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具 | 同步工具与AQS的关联 |
---|---|
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 |
CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。 |
ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 |
ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 |
自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。