概念

是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个volatile的int类型变量state表示持有锁的状态。

抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock…

锁,面向锁的使用者

定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。

同步器,面向锁的实现者

也就是通过同步器,我们可以实现各种不同的锁。

为什么需要AQS

加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要某种形式的队列来进行管理。

抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS,自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

img

image-20211226112141932

AQS

  • 同步状态State成员变量
  • CLH变体的虚拟双向队列

State

  • 0就是没人,自由状态可以办理。
  • 大于等于1,有人占用窗口,需要排队

image-20211226113406835

总结

  1. 锁会存在阻塞,有阻塞就需要排队,实现排队必然需要队列
  2. AQS:state变量+CLH变种的双向队列

框架

首先,我们通过下面的架构图来整体了解一下AQS框架:

img

  • 上图中有颜色的为Method,无颜色的为Attribution。
  • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

数据结构

先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出空指针异常
nextWaiter 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus有下面几个枚举值:

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

image-20211226114029036

同步状态State

在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。

1
2
3
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

下面提供了几个访问这个字段的方法:

方法名 描述
protected final int getState() 获取State的值
protected final void setState(int newState) 设置State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

img

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。

AQS重要方法与ReentrantLock的关联

从架构图中可以得知,AQS提供了大量用于自定义同步器实现的Protected方法。自定义同步器实现的相关方法也只是为了通过修改State字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。

以非公平锁为例,这里主要阐述一下非公平锁与AQS之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。

img

为了帮助大家理解ReentrantLock和AQS之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

img

加锁:

  • 通过ReentrantLock的加锁方法Lock进行加锁操作。
  • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
  • AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
  • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

解锁:

  • 通过ReentrantLock的解锁方法Unlock进行解锁。
  • Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
  • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。

img

通过ReentrantLock理解AQS

ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。

在非公平锁中,有一段这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

看一下这个Acquire是怎么写的:

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

Acquire中调用了tryAcquire()方法

1
2
3
4
5
// java.util.concurrent.locks.AbstractQueuedSynchronizer

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

image-20211226144342912

非公平锁和公平锁的tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁少了一个判断!hasQueuedPredecessors()

hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。

image-20211226152202628

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

/**
* @author kylin
*/

public class AQSDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println("------A thread come in");
//暂停几秒钟线程
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"A").start();

new Thread(() -> {
lock.lock();
try {
System.out.println("------B thread come in");
//暂停几秒钟线程
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"B").start();

new Thread(() -> {
lock.lock();
try {
System.out.println("------C thread come in");
//暂停几秒钟线程
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"C").start();
}
}

假设有三个人去银行办理业务,第一个的A前面没有人,首先办理业务。调用lock()方法

image-20211226152336696

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

首先会使用compareAndSetState(0, 1)方法进行判断

image-20211226152507193

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

由于目前A是第一个办理业务的,也就是第一个获取锁人。此时state值还是为默认值0,通过CAS操作将其成功更新为1。返回true。进入到If判断中

调用setExclusiveOwnerThread(Thread.currentThread());

image-20211226152912624

1
2
3
4
5
6
7
8
9
10
11

/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

将当前线程设置为独占访问权限的线程,也就是当前线程持有了这把锁。

到此lock方法就结束了,A线程暂停2秒钟。B线程开始运行调用lock方法。

此时state已经被设置为了1

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

所以不满足判断条件。则运行 acquire(1)

image-20211226155842888

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先调用!tryAcquire(1)

image-20211226160105191

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

非公平锁调用nonfairTryAcquire(1)

image-20211226160421387

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
  • 第一个if条件c==0,此时为1不满足
  • 第二个if条件current == getExclusiveOwnerThread()当前访问独占权限的线程是A,不是B。也不满足
  • 所以直接返回false

第一个if条件c == 0什么时候满足呢?

1
2
3
4
5
6
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

其实就是当B线程运行tryAcquire()方法时,A线程刚好完成,释放了锁,将state设置为了0。满足条件,CAS将state设置为1,设置当前访问独占权限的线程为B。返回true!nonfairTryAcquire(1)为false不满足条件acquire(1)方法结束。lock()成功!

第二个if条件current == getExclusiveOwnerThread()什么时候满足呢?

1
2
3
4
5
6
7
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

当此时还是A线程,也就是前访问独占权限的线程尝试获取锁时满足条件。

此时

  • nextc = c + acquires
  • c=state=1
  • acquires=1
  • nextc=2
  • setState(nextc);设置state为2

将state设置为2,返回true。这也就是为什么ReentrantLock可重入锁!nonfairTryAcquire(1)为false不满足条件acquire(1)方法结束。lock()成功!

我们接着当前的正常逻辑,回到acquire(1)方法

image-20211226160736984

tryAcquire(1)返回false,取否变成true所以接着运行acquireQueued(addWaiter(Node.EXCLUSIVE), 1))

首先查看addWaiter(Node.EXCLUSIVE)加入等待队列中

image-20211226170238746

static final Node EXCLUSIVE = null;

image-20211226170313810

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

image-20211226171522045

1
2
3
4
Node(Thread thread, Node mode) {     // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。
  • Pred指针指向尾节点Tail。
  • 将New中Node的Prev指针指向Pred。
  • 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。

当前模式为独占模式值mode值为null

如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下Enq的方法。

由于当前等待队列中没有线程进行排队,tailnull所以pred=null不满足if条件,运行enq(node)将节点入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

Node t = tailtail为null,满足if条件,调用compareAndSetHead(new Node())CAS设置队列头节点。

image-20211226172258714

1
2
3
4
private final boolean compareAndSetHead(Node update) {
//当期望值为null,也就是只有头节点为null时才能设置成功
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

此时头节点head为null,创建一个新节点Node设置为头节点,成功返回truetail = head;头节点赋值给尾节点。第一个节点头节点,是创建的哨兵节点(虚拟头节点)。

image-20211227215702607

由于for (;;)所以接着循环此时t==null不成立,node.prev = t;将B的前驱指向尾节点(此时尾节点)。

image-20211227220207554

compareAndSetTail(t, node)接着通过CAS操作将尾节点设置为了node也就是B。接着将t.next=node将其后继结点指向node即B。

image-20211227221619148

然后结束循环,回到addWaiter(),运行return node也就是将B节点返回出去。此时回到acquire(1)中,执行acquireQueued(B,1))

image-20211229213344336

image-20211229213503044

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

进入循环final Node p = node.predecessor();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

返回当前节点的前节点,也就是B的前节点,即head头节点。

1
2
3
4
5
6
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

此时p==head满足,再次tryAcquire(arg)尝试获取锁,获取成功的话

setHead(node);将B设置为新的头节点。而以前的头节点p.next

将其后节点指针设置为null(便于GC)返回interruptedfalse。结束。

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

如果不满足p == head && tryAcquire(arg)则进入下一个判断

1
2
3
4
    if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

shouldParkAfterFailedAcquire(p, node)

image-20211229220356776

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

int ws = pred.waitStatus;此时pred代表头节点,ws值为初始值0

1
2
3
4
5
6
7
8
else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);此时将前节点pred也就是头节点的waitStatus值CAS操作成了Node.SIGNAL也就是-1

image-20211229220755241

接着返回false。回到acquireQueued()方法,并没有结束循环,再次循环运行

image-20211229221036238

接着再度运行shouldParkAfterFailedAcquire(p, node),此时

image-20211229221210712

头节点predwaitStatus== Node.SIGNAL进入return true

就能运行parkAndCheckInterrupt())

image-20211229221346536

1
2
3
4
5
6
7
8
9
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

调用改方法,运行LockSupport.park(this);暂停该线程。返回Thread.interrupted();,线程暂停成功则返回true。线程B暂停,等待唤醒…….(方法并没有结束,只是暂停了,唤醒之后继续运行)

此时A线程终于unlock()

image-20211229222141983

1
2
3
public void unlock() {
sync.release(1);
}

image-20211229222313708

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

进入tryRelease(1)

image-20211229222400872

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

此时getState()state的值为1

int c = getState() - releases;等于1-1为0

if (Thread.currentThread() != getExclusiveOwnerThread())

如果当前解锁线程不是锁的持有者,则抛出IllegalMonitorStateException异常。

如果c==0则进判断

  • 设置free=true
  • setExclusiveOwnerThread(null);独占线程(锁拥有者)为null
  • setState(c);变为0
  • 返回free,也就是true

如果c!=0

image-20211229222934762

nonfairTryAcquire()中嵌入式锁中给该值进行过,加值操作。此时只释放了一次,假设进行了两次lock,则设置新的state值,也就是1,返回false。结束。锁还没有释放,需要该线程再度unlock()

image-20211229224017091

锁的嵌入式情况,此时lockMethod2()的unlock

image-20211229224137285

解锁了一次,之后state变成了1,还需要lockMethod2()的unlock,才能真正释放锁。

回到主流程,A线程unlock释放锁,tryRelease成功返回true,state变成了0。

1
2
3
4
5
6
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}

此时h != null && h.waitStatus != 0

  • 头节点h不等于null,为哨兵节点。
  • waitStatus在前面B节点等待的时候使用compareAndSetWaitStatus()变成了-1

进入判断调用unparkSuccessor(h);

image-20220101202314582

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

int ws = node.waitStatus;头节点的waitStatus为-1小于0,调用

compareAndSetWaitStatus(node, ws, 0);将其又变成了0

image-20211227221619148

接着Node s = node.next头节点下一个节点也就是B,if (s == null || s.waitStatus > 0)

  • B节点不等于null
  • B节点的waitStatus等于0,不满足大于0

if (s != null)满足后,调用LockSupport.unpark(s.thread);此时A的unlock()也就执行完成了。B线程被唤醒。

image-20220101203436838

此时B线程被唤醒继续执行代码将interrupted = true;设置为true,接着继续遍历循环,final Node p = node.predecessor();B的前驱节点p为head,执行tryAcquire(arg)此时由于没用了A线程持有锁,B线程获取到了锁,满足条件,调用setHead(node)方法,将B节点的属性赋值给头节点(B此时也就是new Node())

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

其实也就是搞了一个新的哨兵节点(watiStatus=0,thread=0,next则没用改变,如果还有C的话,还是指向C的),将原先指向头节点的前指针为null,

接着p.next = null;原先的头节点再也没用任何指针指向,会被GC掉。

failed=false返回interrupted也就是ture

接着if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))满足判断

调用selfInterrupt();

image-20220101210117318

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

image-20220101210210755

其实B线程唤醒后,应该是从这里再继续执行。

image-20220101205936259

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

setBlocker(t, null);设置blocket为null

所以Interruptible b = blocker; b为null不满足条件,没有中断线程B,只是interrupt0();设置 interrupt flag为false。到此结束。

但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

  1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
  2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的可以看下ThreadPoolExecutor源码。

JUC中的应用场景

除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

自定义同步工具

了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。