AQS抽象的队列同步器
概念
是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个volatile的int类型变量state表示持有锁的状态。
抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock…
锁,面向锁的使用者
定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。
同步器,面向锁的实现者
也就是通过同步器,我们可以实现各种不同的锁。
为什么需要AQS
加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要某种形式的队列来进行管理。
抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS,自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。


AQS
- 同步状态State成员变量
- CLH变体的虚拟双向队列
State
- 0就是没人,自由状态可以办理。
- 大于等于1,有人占用窗口,需要排队

总结
- 锁会存在阻塞,有阻塞就需要排队,实现排队必然需要队列
- AQS:state变量+CLH变种的双向队列
框架
首先,我们通过下面的架构图来整体了解一下AQS框架:

- 上图中有颜色的为Method,无颜色的为Attribution。
- 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
数据结构
先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

| 1 | static final class Node { | 
解释一下几个方法和属性值的含义:
| 方法和属性值 | 含义 | 
|---|---|
| waitStatus | 当前节点在队列中的状态 | 
| thread | 表示处于该节点的线程 | 
| prev | 前驱指针 | 
| predecessor | 返回前驱节点,没有的话抛出空指针异常 | 
| nextWaiter | 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍) | 
| next | 后继指针 | 
线程两种锁的模式:
| 模式 | 含义 | 
|---|---|
| SHARED | 表示线程以共享的模式等待锁 | 
| EXCLUSIVE | 表示线程正在以独占的方式等待锁 | 
waitStatus有下面几个枚举值:
| 枚举 | 含义 | 
|---|---|
| 0 | 当一个Node被初始化的时候的默认值 | 
| CANCELLED | 为1,表示线程获取锁的请求已经取消了 | 
| CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 | 
| PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 | 
| SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 | 

同步状态State
在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。
| 1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer | 
下面提供了几个访问这个字段的方法:
| 方法名 | 描述 | 
|---|---|
| protected final int getState() | 获取State的值 | 
| protected final void setState(int newState) | 设置State的值 | 
| protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State | 
这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。
AQS重要方法与ReentrantLock的关联
从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):
| 方法名 | 描述 | 
|---|---|
| protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 | 
| protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 | 
| protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 | 
| protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 | 
| protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 | 
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。
以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。

为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

加锁:
- 通过ReentrantLock的加锁方法Lock进行加锁操作。
- 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
- AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
- tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
- 通过ReentrantLock的解锁方法Unlock进行解锁。
- Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
- Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。

通过ReentrantLock理解AQS
ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。
在非公平锁中,有一段这样的代码:
| 1 | /** | 
看一下这个Acquire是怎么写的:
| 1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer | 
Acquire中调用了tryAcquire()方法
| 1 | // java.util.concurrent.locks.AbstractQueuedSynchronizer | 
可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

非公平锁和公平锁的tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁少了一个判断!hasQueuedPredecessors()
hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。

| 1 | 
 | 
假设有三个人去银行办理业务,第一个的A前面没有人,首先办理业务。调用lock()方法

| 1 | final void lock() { | 
首先会使用compareAndSetState(0, 1)方法进行判断

| 1 | protected final boolean compareAndSetState(int expect, int update) { | 
由于目前A是第一个办理业务的,也就是第一个获取锁人。此时state值还是为默认值0,通过CAS操作将其成功更新为1。返回true。进入到If判断中
调用setExclusiveOwnerThread(Thread.currentThread());

| 1 | 
 | 
将当前线程设置为独占访问权限的线程,也就是当前线程持有了这把锁。
到此lock方法就结束了,A线程暂停2秒钟。B线程开始运行调用lock方法。
此时state已经被设置为了1
| 1 | final void lock() { | 
所以不满足判断条件。则运行 acquire(1)

| 1 | public final void acquire(int arg) { | 
首先调用!tryAcquire(1)

| 1 | protected final boolean tryAcquire(int acquires) { | 
非公平锁调用nonfairTryAcquire(1)

| 1 | final boolean nonfairTryAcquire(int acquires) { | 
- 第一个if条件c==0,此时为1不满足
- 第二个if条件current == getExclusiveOwnerThread()当前访问独占权限的线程是A,不是B。也不满足
- 所以直接返回false
第一个if条件c == 0什么时候满足呢?
| 1 | if (c == 0) { | 
其实就是当B线程运行tryAcquire()方法时,A线程刚好完成,释放了锁,将state设置为了0。满足条件,CAS将state设置为1,设置当前访问独占权限的线程为B。返回true。!nonfairTryAcquire(1)为false不满足条件acquire(1)方法结束。lock()成功!
第二个if条件current == getExclusiveOwnerThread()什么时候满足呢?
| 1 | else if (current == getExclusiveOwnerThread()) { | 
当此时还是A线程,也就是前访问独占权限的线程尝试获取锁时满足条件。
此时
- nextc = c + acquires
- c=state=1
- acquires=1
- nextc=2
- setState(nextc);设置state为2
将state设置为2,返回true。这也就是为什么ReentrantLock为可重入锁。!nonfairTryAcquire(1)为false不满足条件acquire(1)方法结束。lock()成功!
我们接着当前的正常逻辑,回到acquire(1)方法

tryAcquire(1)返回false,取否变成true所以接着运行acquireQueued(addWaiter(Node.EXCLUSIVE), 1))
首先查看addWaiter(Node.EXCLUSIVE)加入等待队列中

static final Node EXCLUSIVE = null;

| 1 | private Node addWaiter(Node mode) { | 

| 1 | Node(Thread thread, Node mode) { // Used by addWaiter | 
主要的流程如下:
- 通过当前的线程和锁模式新建一个节点。
- Pred指针指向尾节点Tail。
- 将New中Node的Prev指针指向Pred。
- 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
当前模式为独占模式值mode值为null
如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。
由于当前等待队列中没有线程进行排队,tail为null所以pred=null不满足if条件,运行enq(node)将节点入队
| 1 | private Node enq(final Node node) { | 
Node t = tailtail为null,满足if条件,调用compareAndSetHead(new Node())CAS设置队列头节点。

| 1 | private final boolean compareAndSetHead(Node update) { | 
如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
此时头节点head为null,创建一个新节点Node设置为头节点,成功返回true将tail = head;头节点赋值给尾节点。第一个节点头节点,是创建的哨兵节点(虚拟头节点)。

由于for (;;)所以接着循环此时t==null不成立,node.prev = t;将B的前驱指向尾节点(此时尾节点)。

compareAndSetTail(t, node)接着通过CAS操作将尾节点设置为了node也就是B。接着将t.next=node将其后继结点指向node即B。

然后结束循环,回到addWaiter(),运行return node也就是将B节点返回出去。此时回到acquire(1)中,执行acquireQueued(B,1))


| 1 | final boolean acquireQueued(final Node node, int arg) { | 
进入循环final Node p = node.predecessor();
| 1 | /** | 
返回当前节点的前节点,也就是B的前节点,即head头节点。
| 1 | if (p == head && tryAcquire(arg)) { | 
此时p==head满足,再次tryAcquire(arg)尝试获取锁,获取成功的话
则setHead(node);将B设置为新的头节点。而以前的头节点p.next
将其后节点指针设置为null(便于GC)返回interruptedfalse。结束。
| 1 | private void setHead(Node node) { | 
如果不满足p == head && tryAcquire(arg)则进入下一个判断
| 1 | if (shouldParkAfterFailedAcquire(p, node) && | 
shouldParkAfterFailedAcquire(p, node)

| 1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { | 
int ws = pred.waitStatus;此时pred代表头节点,ws值为初始值0
| 1 | else { | 
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);此时将前节点pred也就是头节点的waitStatus值CAS操作成了Node.SIGNAL也就是-1

接着返回false。回到acquireQueued()方法,并没有结束循环,再次循环运行

接着再度运行shouldParkAfterFailedAcquire(p, node),此时

头节点pred的waitStatus== Node.SIGNAL进入return true
就能运行parkAndCheckInterrupt())

| 1 | /** | 
调用改方法,运行LockSupport.park(this);暂停该线程。返回Thread.interrupted();,线程暂停成功则返回true。线程B暂停,等待唤醒…….(方法并没有结束,只是暂停了,唤醒之后继续运行)
此时A线程终于unlock()

| 1 | public void unlock() { | 

| 1 | public final boolean release(int arg) { | 
进入tryRelease(1)

| 1 | protected final boolean tryRelease(int releases) { | 
此时getState()state的值为1
int c = getState() - releases;等于1-1为0
if (Thread.currentThread() != getExclusiveOwnerThread())
如果当前解锁线程不是锁的持有者,则抛出IllegalMonitorStateException异常。
如果c==0则进判断
- 设置free=true
- setExclusiveOwnerThread(null);独占线程(锁拥有者)为null
- setState(c);变为0
- 返回free,也就是true
如果c!=0

在nonfairTryAcquire()中嵌入式锁中给该值进行过,加值操作。此时只释放了一次,假设进行了两次lock,则设置新的state值,也就是1,返回false。结束。锁还没有释放,需要该线程再度unlock()

锁的嵌入式情况,此时lockMethod2()的unlock

解锁了一次,之后state变成了1,还需要lockMethod2()的unlock,才能真正释放锁。
回到主流程,A线程unlock释放锁,tryRelease成功返回true,state变成了0。
| 1 | if (tryRelease(arg)) { | 
此时h != null && h.waitStatus != 0
- 头节点h不等于null,为哨兵节点。
- waitStatus在前面B节点等待的时候使用compareAndSetWaitStatus()变成了-1
进入判断调用unparkSuccessor(h);

| 1 | /** | 
int ws = node.waitStatus;头节点的waitStatus为-1小于0,调用
compareAndSetWaitStatus(node, ws, 0);将其又变成了0

接着Node s = node.next头节点下一个节点也就是B,if (s == null || s.waitStatus > 0)
- B节点不等于null
- B节点的waitStatus等于0,不满足大于0
if (s != null)满足后,调用LockSupport.unpark(s.thread);此时A的unlock()也就执行完成了。B线程被唤醒。

此时B线程被唤醒继续执行代码将interrupted = true;设置为true,接着继续遍历循环,final Node p = node.predecessor();B的前驱节点p为head,执行tryAcquire(arg)此时由于没用了A线程持有锁,B线程获取到了锁,满足条件,调用setHead(node)方法,将B节点的属性赋值给头节点(B此时也就是new Node())
| 1 | private void setHead(Node node) { | 
其实也就是搞了一个新的哨兵节点(watiStatus=0,thread=0,next则没用改变,如果还有C的话,还是指向C的),将原先指向头节点的前指针为null,
接着p.next = null;原先的头节点再也没用任何指针指向,会被GC掉。
failed=false返回interrupted也就是ture
接着if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))满足判断
调用selfInterrupt();

| 1 | static void selfInterrupt() { | 

其实B线程唤醒后,应该是从这里再继续执行。

| 1 | public static void park(Object blocker) { | 
setBlocker(t, null);设置blocket为null
所以Interruptible b = blocker; b为null不满足条件,没有中断线程B,只是interrupt0();设置 interrupt flag为false。到此结束。
但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
- 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
- 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的可以看下ThreadPoolExecutor源码。
JUC中的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
| 同步工具 | 同步工具与AQS的关联 | 
|---|---|
| ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 | 
| Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 | 
| CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。 | 
| ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 | 
| ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 | 
自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。













