基于AQS实现的并发工具:Semaphore、CountDownLatch、CyclicBarrier与ReentrantReadWriteLock
AQS 基础回顾
AQS,即 AbstractQueuedSynchronizer,是 Java 并发包中构建锁和同步器的基础框架,位于java.util.concurrent.locks
包下。它通过一个int
类型的变量state
来表示同步状态,例如在独占锁中,state
为 0 表示锁未被占用,为 1 表示锁已被占用 ,而在可重入锁中,state
的值会随着线程重入次数而增加。
AQS 内部维护了一个基于 FIFO(先进先出)原则的双向队列,当线程请求共享资源而获取失败时,该线程会被封装成一个Node
节点加入到队列中。队列中的每个Node
节点都包含了线程的引用、等待状态以及前驱和后继节点的引用。例如,当一个线程调用ReentrantLock
的lock
方法获取锁失败时,它就会被加入到这个等待队列中。
AQS 支持独占和共享两种资源访问模式。在独占模式下,如ReentrantLock
,同一时刻只有一个线程能获取到锁;而在共享模式下,如Semaphore
和CountDownLatch
,多个线程可以同时获取资源 。开发者通过继承 AQS 并重写其特定的抽象方法,如tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
等,来实现自定义的同步逻辑,然后利用 AQS 提供的模板方法,如acquire
、release
、acquireShared
、releaseShared
等,来完成同步状态的获取与释放操作。
Semaphore:信号量
1. 什么是 Semaphore
Semaphore 是基于 AQS 实现的信号量,它的主要职责是控制同时访问特定资源的线程数量 。可以将其想象成一个停车场的入口闸机,闸机内预设了一定数量的停车票(许可),每辆车进入停车场时需要获取一张停车票(调用acquire
方法获取许可),离开时归还停车票(调用release
方法释放许可)。当停车票发放完后,后续车辆只能在入口排队等待,直到有车辆归还停车票。在 Java 中,Semaphore
位于java.util.concurrent
包下。
2. 工作原理深入剖析
Semaphore 的核心方法是acquire()
和release()
。当一个线程调用acquire()
方法时,它试图从 Semaphore 中获取一个许可。如果此时 Semaphore 中还有可用许可(即 AQS 的state
变量大于 0),则该线程获取许可成功,state
变量减 1,线程可以继续执行;如果state
变量为 0,意味着没有可用许可,该线程会被封装成一个Node
节点加入到 AQS 的等待队列中,进入阻塞状态,直到有其他线程释放许可。
当线程调用release()
方法时,会释放一个许可,即state
变量加 1。如果此时等待队列中有线程在等待许可,那么处于等待队列头部的线程会被唤醒,尝试获取许可。例如,假设有 5 个线程同时调用acquire()
方法,而 Semaphore 初始许可数量为 3,那么前 3 个线程获取许可成功,state
变为 0,后 2 个线程进入等待队列。当其中一个已获取许可的线程调用release()
方法后,state
变为 1,等待队列中的一个线程被唤醒并获取许可。
3. 应用场景举例
在实际开发中,Semaphore 常用于限流场景。比如在高并发的 Web 应用中,数据库连接资源是有限的,为了防止过多线程同时请求数据库连接导致数据库压力过大,我们可以使用 Semaphore 来限制同时获取数据库连接的线程数量。假设数据库连接池最大连接数为 10,我们可以创建一个初始许可数量为 10 的 Semaphore。每个线程在获取数据库连接前,先调用semaphore.acquire()
获取许可,使用完连接后调用semaphore.release()
释放许可。这样就能确保在任意时刻,最多只有 10 个线程能够获取到数据库连接 。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class DatabaseAccess {
private static final String URL = "jdbc:mysql://localhost:3306/yourdb";
private static final String USER = "root";
private static final String PASSWORD = "password";
private static final Semaphore semaphore = new Semaphore(10);
public static Connection getConnection() throws InterruptedException, SQLException {
semaphore.acquire();
try {
return DriverManager.getConnection(URL, USER, PASSWORD);
} catch (SQLException e) {
semaphore.release();
throw e;
}
}
public static void releaseConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executorService.submit(() -> {
try {
Connection connection = getConnection();
// 执行数据库操作
System.out.println(Thread.currentThread().getName() + " 获得连接,执行操作");
releaseConnection(connection);
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,Semaphore
被用来限制同时获取数据库连接的线程数量为 10,有效保护了数据库连接资源,避免了因过多线程并发访问数据库而导致的性能问题。
CountDownLatch:闭锁
1. CountDownLatch 初相识
CountDownLatch 是 Java 并发包java.util.concurrent
中的一个同步辅助类,它的主要作用是允许一个或多个线程等待,直到其他线程完成一组操作。可以把它想象成一个倒计时器,在初始化时设置一个计数值,每有一个线程完成任务,就将计数值减 1,当计数值减为 0 时,等待的线程就会被释放,继续执行后续操作 。例如,在一场多人比赛中,只有当所有选手都到达终点后,裁判才会宣布比赛结束,这里就可以用 CountDownLatch 来实现这种同步机制。
2. 核心原理揭秘
CountDownLatch 基于 AQS 实现,它内部维护了一个继承自 AQS 的Sync
类 。在 CountDownLatch 创建时,传入的计数值会被赋值给 AQS 的state
变量,作为计数器的初始值。当一个线程调用await()
方法时,它会尝试获取共享锁,实际上是检查state
是否为 0 。如果state
不为 0,说明还有线程未完成任务,该线程会被封装成一个Node
节点加入到 AQS 的等待队列中,进入阻塞状态;如果state
为 0,则获取共享锁成功,线程可以继续执行。
当线程调用countDown()
方法时,会释放共享锁,即通过compareAndSetState
方法以 CAS 操作将state
的值减 1 。如果减 1 后state
变为 0,说明所有线程都已完成任务,此时会唤醒 AQS 等待队列中所有处于阻塞状态的线程。例如,假设有 5 个线程,创建 CountDownLatch 时计数值为 3,当 3 个线程依次调用countDown()
方法后,state
变为 0,在await()
方法中等待的线程就会被唤醒。
3. 代码示例与场景应用
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(new Worker(latch, i)).start();
}
System.out.println("主线程等待所有子线程完成任务...");
latch.await();
System.out.println("所有子线程任务已完成,主线程继续执行");
}
}
class Worker implements Runnable {
private final CountDownLatch latch;
private final int id;
public Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
try {
System.out.println("线程 " + id + " 开始执行任务");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + id + " 任务执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
在上述代码中,主线程创建了一个计数值为 3 的 CountDownLatch,并启动了 3 个子线程。每个子线程在执行完任务后调用countDown()
方法,主线程调用await()
方法等待所有子线程完成任务。当所有子线程都调用了countDown()
方法后,计数值变为 0,主线程被唤醒,继续执行后续代码 。在实际应用中,CountDownLatch 常用于多个任务并行执行,主线程需要等待所有任务完成后再进行汇总或后续处理的场景,比如在电商系统中,获取商品详情时,需要从多个不同的服务获取商品的基本信息、价格信息、库存信息等,只有当所有信息都获取完成后,才能组装完整的商品详情返回给用户,此时就可以使用 CountDownLatch 来实现这种同步等待的逻辑。
CyclicBarrier:循环栅栏
1. CyclicBarrier 是什么
CyclicBarrier 是 Java 并发包java.util.concurrent
中的一个同步工具类,可翻译为循环栅栏 。它的作用是让一组线程在到达某个屏障点时相互等待,直到所有线程都到达该屏障点后,这些线程才会继续执行后续操作 。并且,当所有线程都通过屏障后,CyclicBarrier 可以被重置并再次使用,这也是它被称为 “循环” 栅栏的原因。例如,在一场接力比赛中,每个接力点就相当于一个屏障点,每个运动员(线程)需要跑到接力点等待其他运动员都到达后,才能开始下一棒的比赛 。
2. 工作机制解析
CyclicBarrier 内部主要通过ReentrantLock
和Condition
来实现线程的阻塞和唤醒。它有一个计数器count
,初始值为构造函数中传入的parties
,表示需要到达屏障点的线程数量 。当一个线程调用await()
方法时,它会首先获取ReentrantLock
锁,然后将count
减 1 。如果count
不为 0,说明还有其他线程未到达屏障点,该线程会调用Condition
的await()
方法进入等待状态,并释放锁;如果count
为 0,说明所有线程都已到达屏障点,此时会执行以下操作:
执行构造函数中传入的barrierCommand
(如果有的话),这个barrierCommand
会在所有线程到达屏障点后,由最后一个到达的线程执行。
通过Condition
的signalAll()
方法唤醒所有在等待的线程。
将count
重置为parties
,并创建一个新的Generation
对象,表示进入了新的一轮循环 。
当线程从await()
方法返回时,说明它已经通过了屏障点,可以继续执行后续代码 。如果在等待过程中发生了以下情况,线程会抛出相应的异常:
如果当前线程被中断,会抛出InterruptedException
,并且会调用breakBarrier()
方法将CyclicBarrier
的状态设置为 “损坏”,即broken
为true
,同时唤醒所有等待的线程 。
如果等待超时(使用带超时参数的await()
方法时),会抛出TimeoutException
,同样会调用breakBarrier()
方法将CyclicBarrier
的状态设置为 “损坏” 。
如果在等待过程中发现CyclicBarrier
已经处于 “损坏” 状态(即broken
为true
),会抛出BrokenBarrierException
。
3. 与 CountDownLatch 的区别
功能不同:CountDownLatch 主要用于一个或多个线程等待其他线程完成一组操作,它的计数器是递减的,且只能使用一次;而 CyclicBarrier 是让一组线程相互等待,直到所有线程都到达屏障点,它的计数器可以重置,能够循环使用 。
使用场景不同:CountDownLatch 常用于主线程等待子线程完成任务后再进行汇总或后续处理的场景,如多个任务并行执行,主线程等待所有任务完成后进行结果合并;CyclicBarrier 则更适用于需要多线程协同工作,在某个阶段等待所有线程都准备好后再一起继续执行的场景,如多个线程共同处理一个复杂任务,每个线程完成自己负责的部分后,等待其他线程也完成,然后一起进行下一步处理 。
实现原理不同:CountDownLatch 基于 AQS 的共享模式实现,通过state
变量来计数,当state
减为 0 时唤醒等待的线程;CyclicBarrier 则是借助ReentrantLock
和Condition
来实现线程的阻塞和唤醒,通过内部的计数器count
来控制线程的等待和继续执行 。
ReentrantReadWriteLock:读写锁的高效之道
1. 读写锁简介
在多线程编程中,读操作和写操作对共享资源的访问具有不同的特性。读操作不会修改共享资源,因此多个线程同时进行读操作不会产生数据一致性问题;而写操作会修改共享资源,为了保证数据的一致性,写操作必须是排他的,即同一时刻只能有一个线程进行写操作 。ReentrantReadWriteLock 正是基于这样的背景应运而生,它位于java.util.concurrent.locks
包下 。
ReentrantReadWriteLock 提供了一种读写分离的锁机制,读锁(ReadLock
)可以被多个线程同时获取,实现了读操作的共享;写锁(WriteLock
)则是排他的,同一时刻只有一个线程能够获取写锁 。这种机制在实际开发中非常有用,特别是在那些读操作远远多于写操作的场景中,能够显著提高系统的并发性能 。例如,在一个电商系统中,商品详情页面的展示是一个读操作频繁的场景,而商品库存的更新则是写操作,使用 ReentrantReadWriteLock 可以让大量用户同时读取商品详情,而只有在更新库存时才会进行写锁的独占操作 。
2. 原理深度解析
ReentrantReadWriteLock 基于 AQS 实现,它巧妙地利用了 AQS 的state
变量来同时管理读锁和写锁的状态。state
是一个 32 位的整数,ReentrantReadWriteLock 将其高 16 位用于表示读锁的计数,低 16 位用于表示写锁的计数 。
具体来说,定义了以下几个关键常量:
SHARED_SHIFT = 16
:表示读锁计数的位移量,通过将state
右移 16 位,可以得到读锁的计数值。
SHARED_UNIT = (1 << SHARED_SHIFT)
:读锁计数的单位,即每次增加一个读锁时,state
需要增加的值。
MAX_COUNT = (1 << SHARED_SHIFT) - 1
:读锁和写锁计数的最大值,防止计数溢出。
EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1
:用于提取写锁计数的掩码,通过state
与该掩码进行与操作,可以得到写锁的计数值。
在获取写锁时,线程会调用writeLock().lock()
方法 。该方法会首先尝试获取写锁,即调用tryAcquire
方法 。如果当前没有线程持有读锁和写锁(state
为 0),或者当前线程已经持有写锁(可重入情况),则获取写锁成功,并将state
的低 16 位增加相应的重入次数 。如果有其他线程持有读锁或写锁,则获取写锁失败,线程会被加入到 AQS 的等待队列中 。例如:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
在获取读锁时,线程调用readLock().lock()
方法,进而调用tryAcquireShared
方法 。如果当前没有线程持有写锁,或者当前线程就是持有写锁的线程(锁降级情况),并且读锁计数未达到最大值,则获取读锁成功,将state
的高 16 位增加相应的计数 。如果有其他线程持有写锁,则获取读锁失败,线程加入等待队列 。例如:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
在释放写锁时,调用writeLock().unlock()
方法,通过tryRelease
方法将state
的低 16 位减少相应的重入次数 。如果重入次数减为 0,则释放写锁,唤醒等待队列中的线程 。释放读锁时,调用readLock().unlock()
方法,通过tryReleaseShared
方法将state
的高 16 位减少相应的计数 。如果读锁计数减为 0,并且没有线程持有写锁,则唤醒等待队列中的线程 。
3. 使用场景与示例
在实际开发中,ReentrantReadWriteLock 常用于读多写少的场景 。比如在一个缓存系统中,缓存数据的读取操作非常频繁,而缓存的更新操作相对较少 。我们可以使用 ReentrantReadWriteLock 来保护缓存数据,代码示例如下:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Cache {
private final Map<String, Object> cache = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public Object get(String key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(String key, Object value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}
在上述代码中,get
方法用于读取缓存数据,使用读锁,允许多个线程同时读取;put
方法用于更新缓存数据,使用写锁,保证同一时刻只有一个线程能够更新缓存,从而避免了数据不一致的问题 。在一个高并发的 Web 应用中,数据库连接池的配置信息也可以使用 ReentrantReadWriteLock 来管理,读操作时多个线程可以同时获取配置信息,写操作时则保证只有一个线程能够修改配置,确保了配置信息的一致性和系统的稳定性 。
总结与展望
Semaphore、CountDownLatch、CyclicBarrier 和 ReentrantReadWriteLock 作为基于 AQS 构建的重要同步工具,各自具备独特的功能和应用场景。Semaphore 通过控制许可数量来管理资源的并发访问,常用于限流和资源池的实现;CountDownLatch 允许线程等待其他线程完成一组操作,在任务汇总和线程启动协调等场景中发挥关键作用;CyclicBarrier 实现线程在屏障点的相互等待,可循环使用的特性使其适用于多线程的阶段性同步任务;ReentrantReadWriteLock 提供读写分离的锁机制,极大地提升了读多写少场景下的并发性能。
在实际的 Java 开发中,尤其是在构建高并发、高性能的系统时,深入理解并灵活运用这些同步工具至关重要。比如在分布式系统中,通过 Semaphore 控制对共享资源的访问频率,利用 CountDownLatch 协调不同服务之间的初始化顺序,借助 CyclicBarrier 实现多个节点在某个阶段的同步协作,运用 ReentrantReadWriteLock 优化缓存数据的读写操作等 。随着技术的不断发展,并发编程的场景和需求也日益复杂多样,未来我们可以进一步探索这些工具在不同场景下的优化组合使用,以及结合其他新技术(如响应式编程、异步 I/O 等)来提升系统的整体性能和稳定性,为构建更加高效、可靠的软件系统奠定坚实的基础。