1. Semaphore 类结构
Semaphore 的类结构与ReentrantLock 的类结构相同,类内部总共存在Sync、NonfairSync、FairSync三个静态内部类。Sync类继承AbstactQueuedSynchronizer。NotfairSync和FairSync继承Sync。
1.1. Semaphore 类方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class Semaphore implements java .io.Serializable { public Semaphore (int permits ) { sync = new NonfairSync (permits ); } public Semaphore (int permits , boolean fair) { sync = fair ? new FairSync (permits ) : new NonfairSync (permits ); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void acquireUninterruptibly () { sync.acquireShared(1 ); } public boolean tryAcquire () { return sync.nonfairTryAcquireShared(1 ) >= 0 ; } public boolean tryAcquire (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void release () { sync.releaseShared(1 ); } public int availablePermits () { return sync.getPermits(); } public int drainPermits () { return sync.drainPermits(); } public boolean isFair () { return sync instanceof FairSync; } public final boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } public final int getQueueLength () { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads () { return sync.getQueuedThreads(); } }
1.2. Sync 内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits ) { setState(permits ); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } final void reducePermits (int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error ("Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits () { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } }
方法
作用
Sync(int permits)
构造器,接受一个参数 permits,并通过 setState(permits) 将内部状态设置为指定的许可数量。
getPermits()
返回当前的许可数量,即当前同步器的状态。
nonfairTryAcquireShared(int acquires)
尝试以非公平的方式获取共享许可。它通过不断循环,尝试减少可用许可的数量,直到成功获取所需的许可或者无法获取为止
tryReleaseShared(int releases)
尝试释放共享许可。它通过不断循环,尝试增加可用许可的数量,直到成功释放指定数量的许可。
reducePermits(int reductions)
用于减少许可的数量。它通过不断循环,尝试减少当前许可的数量。
drainPermits()
用于一次性获取所有的许可。它通过不断循环,将当前许可的数量设置为零,然后返回原先的许可数量。
1.3. NonfairSync 内部类 1 2 3 4 5 6 7 8 9 10 11 12 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits ) { super (permits ); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } }
1.4. FairSync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits ) { super (permits ); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
2. Semaphore 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class SemaphoreExample { private static final int MAX_AVAILABLE_PERMITS = 3 ; private static final Semaphore semaphore = new Semaphore (MAX_AVAILABLE_PERMITS); public static void main (String[] args) { for (int i = 0 ; i < 6 ; i++) { new Thread (new Task ("Thread - " + i)).start(); } } static class Task implements Runnable { private final String name; public Task (String name) { this .name = name; } @Override public void run () { try { semaphore.acquire(); System.out.println(name + " acquired the permit" ); Thread.sleep(1000 ); }catch (InterruptedException e){ e.printStackTrace(); }finally { System.out.println(name + " released the permit" ); semaphore.release(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Connected to the target VM, address: '127.0.0.1:62443' , transport: 'socket' Thread - 1 acquired the permit Thread - 2 acquired the permit Thread - 0 acquired the permit Thread - 1 released the permit Thread - 3 acquired the permit Thread - 2 released the permit Thread - 0 released the permit Thread - 4 acquired the permit Thread - 5 acquired the permit Thread - 3 released the permit Thread - 4 released the permit Thread - 5 released the permit Disconnected from the target VM, address: '127.0.0.1:62443' , transport: 'socket'
3. Semaphore核心函数解析
以下给出Semaphore核心函数acquire和release,为函数的大致调用,可能与具体调用有些差别。
3.1. acquire方法执行顺序(大致调用图) graph TB;
A(Semaphore : acquire) --> B(AQS : acquireSharedInterruptibly)
B--> C(Semaphore : NonfairSync : tryAcquireShared)
C --> D(Semaphore : Sync :nonfairTryAcquireShared)
D--> F(tryAcquireShared方法返回小于0 执行AQS : doAcquireSharedInterruptibly)
F --> G(AQS : tryAcquireShared)
G --> H(AQS : setHeadAndPropagate )
H --> I(AQS : doReleaseShared)
3.2. release方法执行顺序(大致调用图) graph TB;
A(Semaphore : release) --> B(AQS : releaseShared)
B--> C(Semaphore : Sync : tryReleaseShared)
C --> D(AQS : doReleaseShared)
D--> F(AQS : unparkSuccessor)
4. ReentrantLock和Semaphare的对比 4.1. 共同 同步机制:都是基于AQS实现的多线程同步工具,旨在协调和控制多线程对共享资源的访问。
可用于资源:两者都可以用于控制对临界区或者资源的访问,防止多个线程同时访问或修改共享资源。
4.2. 区别 Lock类型:
ReentrantLock 是一种独占锁(Exclusive Lock),一个线程获取了锁之后,其他线程必须等待该线程释放锁。
Semaphore 是一种信号量,它可以是独占锁也可以是共享锁。通过设置许可数,Semaphore 控制对共享资源的访问。
获取方式:
ReentrantLock 提供了显式的 lock() 和 unlock() 方法,线程需要手动获取和释放锁。
Semaphore 使用 acquire() 和 release() 方法来获取和释放许可。线程需要通过 acquire() 获取许可,通过 release() 释放许可。
许可数量:
ReentrantLock 只允许一个线程同时持有锁。
Semaphore 允许多个线程同时访问临界区,但许可的数量可以被限制,控制同时访问的线程数量。
可中断性:
ReentrantLock 提供了可中断的锁获取方式,可以使用 lockInterruptibly() 方法。
Semaphore 的 acquire() 方法也支持可中断。
条件等待:
ReentrantLock 提供了 Condition 条件对象,可以通过 newCondition() 方法创建,使线程能够以一种有序的方式等待。
Semaphore 不直接支持条件等待。