Java并发 - AQS详解

Java并发 - AQS详解

在Java并发编程中,我们经常听到AQS(AbstractQueuedSynchronizer)这个概念,它是Java中锁的核心之一。本文将深入介绍AQS,通过提出一系列问题,带你深入了解AQS的定义、实现、资源获取方式以及应用场景。

带着问题阅读

  1. 什么是AQS? 为什么它是锁核心?
  2. AQS是如何实现的?
  3. AQS定义了什么样的资源获取方式?

1. AbstractQueuedSynchronizer介绍

AbstractQueuedSynchronizer提供了一套可用于实现锁同步机制的框架,不夸张地说,AQSJUC同步框架的基石。AQS通过一个FIFO队列维护线程同步状态,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。

AQS根据资源互斥级别提供了独占和共享两种资源访问模式;同时其定义Condition结构提供了wait/signal等待唤醒机制。在JUC中,诸如ReentrantLockCountDownLatch等都基于AQS实现。

AQS原理

image-20240115173319727

2. AQS数据结构

AQS有三个重要的字段,分别是: head 头节点、tail 尾节点、state 同步状态

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// todo volatile 是保证内存的可进行
// ......
}

AQS内部使用一个volatile int state来表示同步状态,该状态用于标识同步资源的可用数量或者被占用的情况。例如:独占锁中,state通常表示锁的持有次数,可以为0表示未被占用,大于0表示已被占用。对于共享锁,state通常表示共享资源的数量。

AQS本身就是个双向链表,head和tail 就是保存的等待队列的头尾结点。

当某个线程获取资源失败时,会被构建成节点加入AQS中

1
2
3
4
5
6
7
8
9
10
11
12
static final class Node {
// 节点状态
       volatile int waitStatus;
  // 前驱节点
       volatile Node prev;
    // 后继节点
       volatile Node next;
  // 当前节点所代表的线程
       volatile Thread thread;
  // 等待队列使用时的后继节点指针
       Node nextWaiter;
}

以下是waitStatus节点状态的取值及各个状态的含义:

  • CANCELLED(1):表示节点因为超时或者被中断而被取消。当一个线程在等待队列中等待的时候,如果发生了超时或者线程被中断,节点的状态会被设置为 CANCELLED
  • SIGNAL(-1):表示后继节点会被阻塞,当前节点需要唤醒后继节点。在独占锁的等待队列中,前一个持有锁的线程释放锁时,会将后继节点的 waitStatus 设置为 SIGNAL,以通知后继节点可以尝试获取锁了。
  • CONDITION(-2):表示节点在等待队列中,等待条件变量。当一个节点通过 Condition 进入等待队列时,其 waitStatus 会被设置为 CONDITION
  • PROPAGATE(-3):用于共享模式。表示释放锁时,如果发现有后继节点需要被唤醒,会将后继节点的 waitStatus 设置为 PROPAGATE
  • 0:表示初始状态,或者表示节点在共享模式下,后继节点在释放锁时需要被唤醒。

有关共享模式,下文会介绍。

AQS中还有另外一个内部类ConditionObject用于实现等待队列/条件队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 实现Condition接口
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的头节点
private transient Node firstWaiter;
// 条件队列的尾节点
private transient Node lastWaiter;

// 创建条件队列的节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点被取消,清理掉取消节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// transient修饰的变量在对象被序列化时会被忽略,不会被写入到序列化的数据流中。
// ......
}

这里主要分析 ConditionObject 中的一些关键部分:

  1. firstWaiterlastWaiter 这两个字段分别表示条件队列的头节点和尾节点。当线程调用 Condition.await() 进入等待状态时,会创建一个节点,并加入到条件队列中。firstWaiter 指向队列的头部,而 lastWaiter 指向队列的尾部。
  2. addConditionWaiter() 方法: 这个方法用于在条件队列中创建一个节点。在创建节点之前,会检查队列尾部是否存在已经取消的节点,如果有的话会清理掉这些取消节点。然后创建一个新的节点,并将其加入到队列的尾部。这个方法在 Condition.await() 中调用,用于创建等待节点。

3. AQS模版方法

1
2
3
4
5
tryAcquire(int);        // 尝试获取独占锁,可获取返回true,否则false
tryRelease(int); // 尝试释放独占锁,可释放返回true,否则false
tryAcquireShared(int); // 尝试以共享方式获取锁,失败返回负数,只能获取一次返回0,否则返回个数
tryReleaseShared(int); // 尝试释放共享锁,可获取返回true,否则false
isHeldExclusively(); // 判断线程是否独占资源

如实现类只需实现独占锁/共享锁功能,可只实现tryAcquire/tryReleasetryAcquireShared/tryReleaseShared。虽然实现tryAcquire/tryRelease可自行设定逻辑,但建议使用state方法对state变量进行操作以实现同步类。

以下是实现一个独占锁示例(与上述原理图对应)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class CustomExclusiveLock extends AbstractQueuedSynchronizer {
// 实现AQS中的抽象方法

@Override
protected boolean isHeldExclusively() {
// 判断当前线程是否持有独占锁
return getExclusiveOwnerThread() == Thread.currentThread();
}

@Override
protected boolean tryAcquire(int arg) {
// 尝试获取独占锁,返回 true 表示获取成功
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalArgumentException("Lock not held");
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 提供外部调用的加锁方法
public void lock() {
acquire(1);
}

// 提供外部调用的解锁方法
public void unlock() {
release(1);
}

public static void main(String[] args) {
CustomExclusiveLock lock = new CustomExclusiveLock();

// 线程1获取锁
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 1 acquired the lock");
Thread.sleep(2000); // 模拟一些业务处理
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1 released the lock");
}
}).start();

// 线程2尝试获取锁(但会被阻塞)
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 2 acquired the lock");
} finally {
lock.unlock();
System.out.println("Thread 2 released the lock");
}
}).start();
}
}
1
2
3
4
5
6
Connected to the target VM, address: '127.0.0.1:49699', transport: 'socket'
Thread 1 acquired the lock
Thread 1 released the lock
Thread 2 acquired the lock
Thread 2 released the lock
Disconnected from the target VM, address: '127.0.0.1:49699', transport: 'socket'

4. 独占式&共享式

AQS中可以分为独占、共享模式,其中这两种模式下还可以支持响应中断、纳秒级别超时

独占模式可以理解为同一时间只有一个线程能够获取同步状态

共享模式可以理解为可以有多个线程能够获取同步状态,方法中常用shared标识

方法中常用acquire标识获取同步状态,release标识释放同步状态

4.1 独占式

独占式实际就是时刻上只允许一个线程独占该资源,多线程竞争情况下也只能有一个线程获取同步状态成功

下面是独占式关键源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ...

/**
* 尝试获取独占锁
*
* @param arg 获取锁所需的参数
* @return 是否成功获取锁
*/
protected abstract boolean tryAcquire(int arg);

/**
* 尝试释放独占锁
*
* @param arg 释放锁所需的参数
* @return 是否成功释放锁
*/
protected abstract boolean tryRelease(int arg);

/**
* 获取独占锁,如果获取失败则进入等待队列
*
* @param arg 获取锁所需的参数
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// ...

/**
* 尝试释放独占锁,并唤醒后继节点
*
* @param arg 释放锁所需的参数
* @return 是否成功释放锁
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// ...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// addWaiter(Node.EXCLUSIVE) 构建独占式节点,并用CAS+失败重试的方式加入AQS的末尾
private Node addWaiter(Node mode) {
       //构建节点
       Node node = new Node(Thread.currentThread(), mode);
       //获取当前的尾节点,获取当前等待队列的尾节点,用于后续将新节点添加到尾部。
       Node pred = tail;
       if (pred != null) {
// 将新节点的前驱设置为当前尾节点,建立节点间的双向链表关系
           node.prev = pred;
//compareAndSetTail 通过 CAS(Compare And Swap)操作尝试将新节点设置为新的尾节点。如果成成功,表示新节点已成功添加到队列尾部。如果失败说明在这个过程中有其他线程修改了尾节点,需要重新进入 enq 流程。
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
          }
      }
       //尾节点为空或则CAS失败执行enq
       enq(node);
       return node;
}

// enq方法主要以自旋(中途不会进入等待模式)去CAS设置尾节点,如果AQS中没有节点则头尾节点为同一节点
private Node enq(final Node node) {
       //失败重试
       for (;;) {
// 获取当前尾节点(Node t = tail;)
           Node t = tail;
           //没有尾节点 则CAS设置头节点(头尾节点为一个节点),否则CAS设置尾节点
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
          } else {
               node.prev = t;
// 通过 CAS 操作尝试将新节点设置为新的尾节点。如果成功,表示新节点已成功添加到队列尾部;如果失败,说明在这个过程中有其他线程修改了尾节点,需要重新进行循环。
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
              }
          }
      }
  }

在这段代码中:

  • tryAcquiretryRelease 是需要具体的子类去实现的,用于定制独占锁的获取和释放逻辑。
  • acquire 方法是获取独占锁的核心方法,它首先尝试通过 tryAcquire 获取锁,如果失败则将线程加入等待队列,最终调用 acquireQueued 进入等待状态。
  • release 方法用于释放锁,它先调用 tryRelease 尝试释放锁,成功后再唤醒后继节点。

4.2 共享式

享式就是允许多个线程同时获取一定的资源,比如信号量、读锁就是用共享式实现的,其实共享式与独占式流程类似,只是尝试获取同步状态的实现不同,我们用个获取同步状态的方法来说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 用于尝试获取共享资源。如果返回值为负数,表示获取失败;如果为零或正数,表示获取成功,返回的值表示成功获取的资源数量。
protected int tryAcquireShared(int arg) {
// 子类实现获取共享资源的逻辑
// 成功获取资源返回正数或零,失败返回负数
throw new UnsupportedOperationException();
}
// 用于释放共享资源。如果返回值为 true,表示释放成功;如果为 false,表示释放失败。
protected boolean tryReleaseShared(int arg) {
// 子类实现释放共享资源的逻辑
// 成功释放返回true,失败返回false
throw new UnsupportedOperationException();
}
// 用于获取共享资源,如果获取失败则将当前线程加入等待队列。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败,加入等待队列
doAcquireShared(arg);
}
// 用于释放共享资源,并唤醒等待队列中的线程。如果释放成功,则返回 true;如果释放失败,则返回 false。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放成功,唤醒等待队列中的线程
doReleaseShared();
return true;
}
return false;
}
// 当 acquireShared 方法中的 tryAcquireShared 返回负数时,会调用此方法将线程加入等待队列。
private void doAcquireShared(int arg) {
//添加共享式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点 并且 获取同步状态成功 设置头节点
if (p == head && tryAcquireShared(arg) >= 0) {
setHeadAndPropagate(node, arg);
p.next = null; // help GC
failed = false;
return;
}
//获取失败进入会等待的自旋
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 当 releaseShared 方法中的 tryReleaseShared 返回 true 时,会调用此方法唤醒等待队列中的线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果头节点的状态为 SIGNAL,表示有等待线程,唤醒一个
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// ......

}

AQS如何保证原子性、可见性:

​ AQS使用头尾节点来实现双向队列,提供同步状态和获取/释放同步状态的模板方法来实现阻塞(同步)队列,并且这些字段使用volatile修饰,保证可见性与读取的场景配合,不需要保证原子性,在写的场景下常用CAS保证原子性。AQS充当阻塞队列,Condition充当它的等待队列来实现等待/通知模式,AQS的内部类ConditionObject在await时会加入Condition末尾并释放同步状态进入等待队列,在被唤醒后自旋(失败会进入等待)获取同步状态;在single时会CAS的将condition头节点并加入AQS尾部再去唤醒(因为一个AQS可能对应多个Condition因此要CAS保证原子性)

AQS获取资源的方式:

AQS分为独占式和共享式,使用独占式时只允许一个线程获取同步状态,使用共享式时则允许多个线程获取同步状态;其中还提供响应中断、等待超时的类似方法

graph LR;
subgraph 主流程
A([开始]) --> B(acquire) -->|tryAcquire|C{更改state}-->|成功|D([结束])
C --> |失败|E(addWaiter) 
G(CAS获取资源失败
不停的自旋) --> |出现异常fail|H(cancelAcquire
取消节点) H --> D end subgraph addWaiter子流程 E -->F{前驱节点是
否为头节点} -->|Y tryAcquire|AA{获取同步状态} F --> |N| Y(pack 进入等待) Y --> |被前驱唤醒|F AA --> |失败|Y end

Java并发 - AQS详解
http://example.com/2025/12/01/Java并发-AQS详解/
作者
TuBoShu
发布于
2025年12月1日
许可协议