Java并发 - Semaphore 详解

1. Semaphore 类结构

image-20240118154244755

Semaphore 的类结构与ReentrantLock 的类结构相同,类内部总共存在Sync、NonfairSync、FairSync三个静态内部类。Sync类继承AbstactQueuedSynchronizer。NotfairSync和FairSync继承Sync。

1.1. Semaphore 类方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Semaphore implements java.io.Serializable {
// 构造方法 permits:许可数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 构造方法 fair:公平锁/非公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 线程会从许可集中尝试获取一个许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 从信号量中获取许可,该方法不响应中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 尝试获取一个许可,如果获取失败,线程不阻塞,直接返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 在给定时间内尝试获取一个许可,如果获取超时并获取失败线程不阻塞,直接返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 线程释放已经获得的许可
public void release() {
sync.releaseShared(1);
}
// 返回当前信号量中未被获取的许可数
public int availablePermits() {
return sync.getPermits();
}
// 获取并返回当前信号量中立即未被获取的所有许可
public int drainPermits() {
return sync.drainPermits();
}
// 返回当前信号量的公平类型,如果公平锁返回true/非公平锁返回false
public boolean isFair() {
return sync instanceof FairSync;
}
// 判断是否存在线程在等待队列中
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 获取所有等待队列中线程集合的长度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回所有等待队列中线程集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
}

1.2. Sync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

// 内部类,继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造器,接受一个参数 `permits`,并通过 `setState(permits)` 将内部状态设置为指定的许可数量。
Sync(int permits) {
setState(permits);
}
// 返回当前的许可数量,即当前同步器的状态。
final int getPermits() {
return getState();
}
// 共享模式下,尝试以非公平的方式获取共享许可。它通过不断循环,尝试减少可用许可的数量,直到成功获取所需的许可或者无法获取为止
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取许可数
int available = getState();
// 剩余的许可数
int remaining = available - acquires;
// 许可小于0或比较设置状态成功(许可数)
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 共享模式下,尝试释放共享许可。它通过不断循环,尝试增加可用许可的数量,直到成功释放指定数量的许可。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前许可
int current = getState();
// 计算可用许可
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 比较设置资源数
if (compareAndSetState(current, next))
return true;
}
}

// 用于减少许可的数量。它通过不断循环,尝试减少当前许可的数量。
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 用于一次性获取所有的许可。它通过不断循环,将当前许可的数量设置为零,然后返回原先的许可数量。
final int drainPermits() {
for (;;) {
// 获取当前许可数
int current = getState();
// 许可为0或者设置许可数为0
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
方法 作用
Sync(int permits) 构造器,接受一个参数 permits,并通过 setState(permits) 将内部状态设置为指定的许可数量。
getPermits() 返回当前的许可数量,即当前同步器的状态。
nonfairTryAcquireShared(int acquires) 尝试以非公平的方式获取共享许可。它通过不断循环,尝试减少可用许可的数量,直到成功获取所需的许可或者无法获取为止
tryReleaseShared(int releases) 尝试释放共享许可。它通过不断循环,尝试增加可用许可的数量,直到成功释放指定数量的许可。
reducePermits(int reductions) 用于减少许可的数量。它通过不断循环,尝试减少当前许可的数量。
drainPermits() 用于一次性获取所有的许可。它通过不断循环,将当前许可的数量设置为零,然后返回原先的许可数量。

1.3. NonfairSync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}
// 这里通过非公平的方式尝试获取共享许可,即调用了父类中定义的 nonfairTryAcquireShared 方法。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

1.4. FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}
// 以下方法在acquire -> acquireSharedInterruptibly时被调用
//公平方式获取共享许可的逻辑
protected int tryAcquireShared(int acquires) {
for (;;) {
// 检查是否有排队的前任线程,如果当前前面还有排队则退出
if (hasQueuedPredecessors())
return -1;
// 获取当前许可
int available = getState();
// 计算当前剩余许可
int remaining = available - acquires;
// 判断许可是否为0和对比设置许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

2. Semaphore 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SemaphoreExample {
private static final int MAX_AVAILABLE_PERMITS = 3;
// 信号量
private static final Semaphore semaphore = new Semaphore(MAX_AVAILABLE_PERMITS);

public static void main(String[] args) {
// 启动访问共享资源
for (int i = 0; i < 6; i++) {
new Thread(new Task("Thread - " + i)).start();
}
}

static class Task implements Runnable{
private final String name;

public Task(String name){
this.name = name;
}

@Override
public void run() {
try {
semaphore.acquire();
System.out.println(name + " acquired the permit");
// 模拟对共享资源的访问
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
System.out.println(name + " released the permit");
semaphore.release();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Connected to the target VM, address: '127.0.0.1:62443', transport: 'socket'
Thread - 1 acquired the permit
Thread - 2 acquired the permit
Thread - 0 acquired the permit
Thread - 1 released the permit
Thread - 3 acquired the permit
Thread - 2 released the permit
Thread - 0 released the permit
Thread - 4 acquired the permit
Thread - 5 acquired the permit
Thread - 3 released the permit
Thread - 4 released the permit
Thread - 5 released the permit
Disconnected from the target VM, address: '127.0.0.1:62443', transport: 'socket'

3. Semaphore核心函数解析

以下给出Semaphore核心函数acquire和release,为函数的大致调用,可能与具体调用有些差别。

3.1. acquire方法执行顺序(大致调用图)

graph TB;
A(Semaphore : acquire) --> B(AQS : acquireSharedInterruptibly)
B--> C(Semaphore : NonfairSync : tryAcquireShared)
C --> D(Semaphore : Sync :nonfairTryAcquireShared)
D--> F(tryAcquireShared方法返回小于0 执行AQS : doAcquireSharedInterruptibly) 
F --> G(AQS : tryAcquireShared)
G --> H(AQS : setHeadAndPropagate )
H --> I(AQS : doReleaseShared)

3.2. release方法执行顺序(大致调用图)

graph TB;
A(Semaphore : release) --> B(AQS : releaseShared)
B--> C(Semaphore : Sync : tryReleaseShared)
C --> D(AQS : doReleaseShared)
D--> F(AQS : unparkSuccessor)

4. ReentrantLock和Semaphare的对比

4.1. 共同

同步机制:都是基于AQS实现的多线程同步工具,旨在协调和控制多线程对共享资源的访问。

可用于资源:两者都可以用于控制对临界区或者资源的访问,防止多个线程同时访问或修改共享资源。

4.2. 区别

Lock类型:

  • ReentrantLock 是一种独占锁(Exclusive Lock),一个线程获取了锁之后,其他线程必须等待该线程释放锁。
  • Semaphore 是一种信号量,它可以是独占锁也可以是共享锁。通过设置许可数,Semaphore 控制对共享资源的访问。

获取方式:

  • ReentrantLock 提供了显式的 lock()unlock() 方法,线程需要手动获取和释放锁。
  • Semaphore 使用 acquire()release() 方法来获取和释放许可。线程需要通过 acquire() 获取许可,通过 release() 释放许可。

许可数量:

  • ReentrantLock 只允许一个线程同时持有锁。
  • Semaphore 允许多个线程同时访问临界区,但许可的数量可以被限制,控制同时访问的线程数量。

可中断性:

  • ReentrantLock 提供了可中断的锁获取方式,可以使用 lockInterruptibly() 方法。
  • Semaphoreacquire() 方法也支持可中断。

条件等待:

  • ReentrantLock 提供了 Condition 条件对象,可以通过 newCondition() 方法创建,使线程能够以一种有序的方式等待。
  • Semaphore 不直接支持条件等待。

Java并发 - Semaphore 详解
http://example.com/2025/12/01/Java并发-Semaphore-详解/
作者
TuBoShu
发布于
2025年12月1日
许可协议