Java多线程学习(五)——AQS深入分析

从 CLH 同步队列说起

CLH 同步队列节点定义

由上篇博文的介绍中,已经了解到了 AQS 的全称是队列同步器。在AQS的内部实现中,依赖其内部持有的队列来完成同步状态的管理。在 JDK 1.8 的源码中对队列节点 Node 的定义如下:(此处为了突出重点和增加可读性,未完全展示源码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}

以上代码展示了 JDK1.8 版本中对于队列同步器队列节点 Node 的定义,由以上代码我们可以看出:

  1. Node 定义的前四行,为同步器的工作方式提供相关支持,工作方式有两种:SHARED 共享模式 和 EXCLUSIVE 独占模式。这两种方法区别如下:
    • 独占模式:同一个锁只能被一个线程获取成功,其他线程必须等待
    • 共享模式:允许多个线程获取同一个锁成功
  2. 随后,Node 节点定义了四个常量:CANCELLEDSIGNALCONDITIONPROPAGATE。这四个常量为 waitStatus 的可选值,其具体含义分别如下:
    • CANCELLED:取消状态,设置为该状态的节点不会继续参与获取锁的竞争,且为该状态的节点不会转变为其他状态。该状态可能由于超时或者中断造成。
    • SIGNAL:后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
    • CONDITION:节点在 等待队列 中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列。
    • PROPAGATE:表示下一次共享式同步状态获取将会无条件地传播下去。
  3. Node 节点中还持有对一个 prev 前驱节点和一个 next 后继节点的引用。这两个节点是形成双向队列结构的基础。
  4. Node 节点中持有的 thread 引用,为入队(构造节点)时的当前线程。
  5. Node 节点中定义的 nextWaiter,为等待队列中的后继节点

同步队列与等待队列

  • 同步队列:由 AQS 维护。当前线程获取同步状态失败时,AQS会讲当前线程及其状态信息构造成一个Node并加入同步队列。
  • 等待队列:由 Condition 维护。等待队列主要是维护一个等待singal信号的队列。
    同步队列与等待队列图解如下:
    同步队列基本结构:
    sync_queue
    等待队列基本结构:
    wait_queue
    有了这两个概念后,我们再看 AQS 中对于 Node 节点的定义,发现节点定义中,持有有三个 Node 节点的饮用,分别为 prevnext 以及 nextWaiter。其中,这些节点引用主要分为两类:
  • prev 、next 节点:同步队列中的前驱、后继节点
  • nextWaiter 节点:等待队列中的后继节点
    Node 节点的定义中持有这两类节点引用,故可以判定 AQS 维护的等待队列和 Condition 维护的同步队列其实是用的同一 Node 类型

同步队列工作原理

上文中已经通过图解的形式给出了同步队列的基本结构。下面我们在查看过同步器节点定义后,接着继续查看 AQS 源码中的定义。可以看到如上文中结构图所示,AQS中持有两个节点: head 和 tail,分别为同步队列的头节点和尾节点。定义部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

同步队列的入队与出队

入队

上篇文章中已经介绍到,当一个线程成功获取同步状态时,其他线程因为无法获得该同步状态,将会被构造成节点并加入到同步队列中。同步器中提供了一个基于 CAS 的设置尾节点的方法:compareAndSet(Node expect, Node update)来设置尾节点,这样可以保证线程安全。下面我们来看一下 AQS 入队操作时调用的 addWaiter(Node mode)方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

根据源码中对addWaiter方法的定义,我们可以看到。该方法首先尝试通过 compareAndSetTail 方法一次尝试设置尾节点,如果成功则直接返回,若不成功则调用enq方法进行尾节点的设置。enq方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

由enq方法的定义中,可以看到,其通过死循环的方式来保证尾节点正确被添加,成功添加后,从该方法后返回。

出队

同步队列遵循 FIFO,具体出队过程如下:

  1. 首节点线程释放同步状态,唤醒其后继节点
  2. 后继节点尝试获取同步状态,并在获取成功时将自己设置为首节点

独占式同步状态的获取与释放

独占式同步状态获取

独占式同步状态的获取,是从 AQS 的模板方法 acquire(int arg)开始的,我们首先看该方法的定义:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

由方法定义中我们可以看到,acquire(int arg)方法的工作流程可以简单叙述如下:

  1. 调用同步器实现的 tryAcquire(int arg) 方法,尝试安全的获取同步状态,若获取成功,方法结束;若获取不成功,转向步骤2;
  2. 获取同步状态不成功时,构造状态为独占的同步节点,并将其加入到同步队列中,同时转向步骤3;
  3. 使该节点进入自旋状态(不断尝试获取同步状态),并在获取到同步状态之前阻塞该线程。
    步骤 3 中使节点进入自旋状态是通过 acquireQueued(final Node node, int arg) 方法来实现的,该方法的源码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    */
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

由源码可知,该函数的功能执行流程如下:

  1. 当前节点进入自旋状态;
  2. 若当前节点的前驱节点为头结点,尝试获取锁,获取锁成功则返回当前中断状态,并推出该函数。

独占式同步状态释放

线程获取同步状态并执行完全部对应逻辑后,需要对其持有的同步状态进行释放,这一操作调用了同步器的 release(int arg) 这一模板方法。该方法定义如下:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

由上述源码,可以总结出该方法的工作流程如下:

  1. 调用同步器实现的 tryRelease(int arg) 方法尝试释放同步状态,若失败,返回 false,方法结束;若成功,转向步骤2。
  2. 唤醒当前节点的后继结点线程,该内容通过 unparkSuccessor 方法实现。

共享式同步状态的获取与释放

共享式同步状态获取

共享式同步状态的获取与释放这一过程中,允许同一时刻有多个线程获取到同步状态。与独占式不同,共享式同步状态的获取是从 AQS 的模板方法 acquireShared(int arg) 进入。acquireShared(int arg) 方法的源码如下:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

可以看到,在 acquireShared(int arg) 方法的实现中,调用同步器自行实现的 tryAcquireShared(arg) 方法进行共享式的获取锁。若该方法返回大于等于 0 的整数,则获取同步状态成功。共享式获取同步状态的过程可以总结如下:

  1. 调用 tryAcquireShared(arg) 方法进行共享式的获取锁,若返回大于等于0的整数,获取成功,结束;若返回小于 0 的整数,获取锁失败,进入步骤2。
  2. 构造共享式同步节点并调用 addWaiter(Node node) 方法将其添加进同步队列,同时节点进入自旋过程,不断尝试获取锁。

共享式同步状态释放

共享式同步方法释放,是通过调用 releaseShared(int arg) 这一模板方法实现的。该方法主要的功能也是有两个:释放同步状态、唤醒后续处于等待状态的节点。由于共享式同步状态获取时,可能有多个线程同时获得到了同步状态,所以在释放时,需要保证线程安全的释放同步状态。

总结

本篇中,在简单使用 AQS 实现自定义同步组件的基础上,了解了 AQS 内部的实现源码(主要是获取同步状态、释放同步状态的源码工作流程)。当前对于源码也仅是在简单的流程层面进行了大概的分析,
由于初学且行文时间较赶,如果错误还烦请大家多多指教。有相关讨论可以联系 Email:JaydenRansom@outlook.com