Java并发 - AQS详解 在Java并发编程中,我们经常听到AQS(AbstractQueuedSynchronizer)这个概念,它是Java中锁的核心之一。本文将深入介绍AQS,通过提出一系列问题,带你深入了解AQS的定义、实现、资源获取方式以及应用场景。
带着问题阅读
什么是AQS? 为什么它是锁核心?
AQS是如何实现的?
AQS定义了什么样的资源获取方式?
1. AbstractQueuedSynchronizer介绍 AbstractQueuedSynchronizer提供了一套可用于实现锁同步机制的框架,不夸张地说,AQS是JUC同步框架的基石。AQS通过一个FIFO队列维护线程同步状态,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。
AQS根据资源互斥级别提供了独占和共享 两种资源访问模式;同时其定义Condition结构提供了wait/signal等待唤醒机制。在JUC中,诸如ReentrantLock、CountDownLatch等都基于AQS实现。
AQS原理
2. AQS数据结构 AQS有三个重要的字段,分别是: head 头节点、tail 尾节点、state 同步状态
1 2 3 4 5 6 7 8 9 10 11 12 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { private transient volatile Node head; private transient volatile Node tail; private volatile int state; }
AQS内部使用一个volatile int state来表示同步状态,该状态用于标识同步资源的可用数量或者被占用的情况。例如:独占锁中,state通常表示锁的持有次数,可以为0表示未被占用,大于0表示已被占用。对于共享锁,state通常表示共享资源的数量。
AQS本身就是个双向链表,head和tail 就是保存的等待队列的头尾结点。
当某个线程获取资源失败时,会被构建成节点加入AQS中
1 2 3 4 5 6 7 8 9 10 11 12 static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; }
以下是waitStatus节点状态的取值及各个状态的含义:
CANCELLED(1):表示节点因为超时或者被中断而被取消。当一个线程在等待队列中等待的时候,如果发生了超时或者线程被中断,节点的状态会被设置为 CANCELLED。
SIGNAL(-1):表示后继节点会被阻塞,当前节点需要唤醒后继节点。在独占锁的等待队列中,前一个持有锁的线程释放锁时,会将后继节点的 waitStatus 设置为 SIGNAL,以通知后继节点可以尝试获取锁了。
CONDITION(-2):表示节点在等待队列中,等待条件变量。当一个节点通过 Condition 进入等待队列时,其 waitStatus 会被设置为 CONDITION。
PROPAGATE(-3):用于共享模式。表示释放锁时,如果发现有后继节点需要被唤醒,会将后继节点的 waitStatus 设置为 PROPAGATE。
0:表示初始状态,或者表示节点在共享模式下,后继节点在释放锁时需要被唤醒。
有关共享模式,下文会介绍。
AQS中还有另外一个内部类ConditionObject用于实现等待队列/条件队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ConditionObject implements Condition , java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } }
这里主要分析 ConditionObject 中的一些关键部分:
firstWaiter 和 lastWaiter: 这两个字段分别表示条件队列的头节点和尾节点。当线程调用 Condition.await() 进入等待状态时,会创建一个节点,并加入到条件队列中。firstWaiter 指向队列的头部,而 lastWaiter 指向队列的尾部。
addConditionWaiter() 方法: 这个方法用于在条件队列中创建一个节点。在创建节点之前,会检查队列尾部是否存在已经取消的节点,如果有的话会清理掉这些取消节点。然后创建一个新的节点,并将其加入到队列的尾部。这个方法在 Condition.await() 中调用,用于创建等待节点。
3. AQS模版方法 1 2 3 4 5 tryAcquire(int ); tryRelease(int ); tryAcquireShared(int ); tryReleaseShared(int ); isHeldExclusively();
如实现类只需实现独占锁/共享锁功能,可只实现tryAcquire/tryRelease或tryAcquireShared/tryReleaseShared。虽然实现tryAcquire/tryRelease可自行设定逻辑,但建议使用state方法对state变量进行操作以实现同步类。
以下是实现一个独占锁示例(与上述原理图对应)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CustomExclusiveLock extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively () { return getExclusiveOwnerThread() == Thread.currentThread(); } @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 ,1 )){ setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ){ throw new IllegalArgumentException ("Lock not held" ); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public void unlock () { release(1 ); } public static void main (String[] args) { CustomExclusiveLock lock = new CustomExclusiveLock (); new Thread (() -> { lock.lock(); try { System.out.println("Thread 1 acquired the lock" ); Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("Thread 1 released the lock" ); } }).start(); new Thread (() -> { lock.lock(); try { System.out.println("Thread 2 acquired the lock" ); } finally { lock.unlock(); System.out.println("Thread 2 released the lock" ); } }).start(); } }
1 2 3 4 5 6 Connected to the target VM, address: '127.0.0.1:49699', transport: 'socket' Thread 1 acquired the lock Thread 1 released the lock Thread 2 acquired the lock Thread 2 released the lock Disconnected from the target VM, address: '127.0.0.1:49699', transport: 'socket'
4. 独占式&共享式 AQS中可以分为独占、共享模式,其中这两种模式下还可以支持响应中断、纳秒级别超时
独占模式可以理解为同一时间只有一个线程能够获取同步状态
共享模式可以理解为可以有多个线程能够获取同步状态,方法中常用shared标识
方法中常用acquire标识获取同步状态,release标识释放同步状态
4.1 独占式 独占式实际就是时刻上只允许一个线程独占该资源,多线程竞争情况下也只能有一个线程获取同步状态成功
下面是独占式关键源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { protected abstract boolean tryAcquire (int arg) ; protected abstract boolean tryRelease (int arg) ; public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在这段代码中:
tryAcquire 和 tryRelease 是需要具体的子类去实现的,用于定制独占锁的获取和释放逻辑。
acquire 方法是获取独占锁的核心方法,它首先尝试通过 tryAcquire 获取锁,如果失败则将线程加入等待队列,最终调用 acquireQueued 进入等待状态。
release 方法用于释放锁,它先调用 tryRelease 尝试释放锁,成功后再唤醒后继节点。
4.2 共享式 享式就是允许多个线程同时获取一定的资源,比如信号量、读锁就是用共享式实现的,其实共享式与独占式流程类似,只是尝试获取同步状态的实现不同,我们用个获取同步状态的方法来说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException (); } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException (); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquireShared(arg) >= 0 ) { setHeadAndPropagate(node, arg); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } }
AQS如何保证原子性、可见性:
AQS使用头尾节点来实现双向队列,提供同步状态和获取/释放同步状态的模板方法来实现阻塞(同步)队列,并且这些字段使用volatile修饰,保证可见性与读取的场景配合,不需要保证原子性,在写的场景下常用CAS保证原子性。AQS充当阻塞队列,Condition充当它的等待队列来实现等待/通知模式,AQS的内部类ConditionObject在await时会加入Condition末尾并释放同步状态进入等待队列,在被唤醒后自旋(失败会进入等待)获取同步状态;在single时会CAS的将condition头节点并加入AQS尾部再去唤醒(因为一个AQS可能对应多个Condition因此要CAS保证原子性)
AQS获取资源的方式:
AQS分为独占式和共享式,使用独占式时只允许一个线程获取同步状态,使用共享式时则允许多个线程获取同步状态;其中还提供响应中断、等待超时的类似方法
graph LR;
subgraph 主流程
A([开始]) --> B(acquire) -->|tryAcquire|C{更改state}-->|成功|D([结束])
C --> |失败|E(addWaiter)
G(CAS获取资源失败 不停的自旋) --> |出现异常fail|H(cancelAcquire 取消节点)
H --> D
end
subgraph addWaiter子流程
E -->F{前驱节点是 否为头节点} -->|Y tryAcquire|AA{获取同步状态}
F --> |N| Y(pack 进入等待)
Y --> |被前驱唤醒|F
AA --> |失败|Y
end