AQS 基础回顾

AQS,即 AbstractQueuedSynchronizer,是 Java 并发包中构建锁和同步器的基础框架,位于java.util.concurrent.locks包下。它通过一个int类型的变量state来表示同步状态,例如在独占锁中,state为 0 表示锁未被占用,为 1 表示锁已被占用 ,而在可重入锁中,state的值会随着线程重入次数而增加。

AQS 内部维护了一个基于 FIFO(先进先出)原则的双向队列,当线程请求共享资源而获取失败时,该线程会被封装成一个Node节点加入到队列中。队列中的每个Node节点都包含了线程的引用、等待状态以及前驱和后继节点的引用。例如,当一个线程调用ReentrantLocklock方法获取锁失败时,它就会被加入到这个等待队列中。

AQS 支持独占和共享两种资源访问模式。在独占模式下,如ReentrantLock,同一时刻只有一个线程能获取到锁;而在共享模式下,如SemaphoreCountDownLatch,多个线程可以同时获取资源 。开发者通过继承 AQS 并重写其特定的抽象方法,如tryAcquiretryReleasetryAcquireSharedtryReleaseShared等,来实现自定义的同步逻辑,然后利用 AQS 提供的模板方法,如acquirereleaseacquireSharedreleaseShared等,来完成同步状态的获取与释放操作。

Semaphore:信号量

1. 什么是 Semaphore

Semaphore 是基于 AQS 实现的信号量,它的主要职责是控制同时访问特定资源的线程数量 。可以将其想象成一个停车场的入口闸机,闸机内预设了一定数量的停车票(许可),每辆车进入停车场时需要获取一张停车票(调用acquire方法获取许可),离开时归还停车票(调用release方法释放许可)。当停车票发放完后,后续车辆只能在入口排队等待,直到有车辆归还停车票。在 Java 中,Semaphore位于java.util.concurrent包下。

2. 工作原理深入剖析

Semaphore 的核心方法是acquire()release()。当一个线程调用acquire()方法时,它试图从 Semaphore 中获取一个许可。如果此时 Semaphore 中还有可用许可(即 AQS 的state变量大于 0),则该线程获取许可成功,state变量减 1,线程可以继续执行;如果state变量为 0,意味着没有可用许可,该线程会被封装成一个Node节点加入到 AQS 的等待队列中,进入阻塞状态,直到有其他线程释放许可。

当线程调用release()方法时,会释放一个许可,即state变量加 1。如果此时等待队列中有线程在等待许可,那么处于等待队列头部的线程会被唤醒,尝试获取许可。例如,假设有 5 个线程同时调用acquire()方法,而 Semaphore 初始许可数量为 3,那么前 3 个线程获取许可成功,state变为 0,后 2 个线程进入等待队列。当其中一个已获取许可的线程调用release()方法后,state变为 1,等待队列中的一个线程被唤醒并获取许可。

3. 应用场景举例

在实际开发中,Semaphore 常用于限流场景。比如在高并发的 Web 应用中,数据库连接资源是有限的,为了防止过多线程同时请求数据库连接导致数据库压力过大,我们可以使用 Semaphore 来限制同时获取数据库连接的线程数量。假设数据库连接池最大连接数为 10,我们可以创建一个初始许可数量为 10 的 Semaphore。每个线程在获取数据库连接前,先调用semaphore.acquire()获取许可,使用完连接后调用semaphore.release()释放许可。这样就能确保在任意时刻,最多只有 10 个线程能够获取到数据库连接 。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class DatabaseAccess {

   private static final String URL = "jdbc:mysql://localhost:3306/yourdb";

   private static final String USER = "root";

   private static final String PASSWORD = "password";

   private static final Semaphore semaphore = new Semaphore(10);

   public static Connection getConnection() throws InterruptedException, SQLException {

       semaphore.acquire();

       try {
           return DriverManager.getConnection(URL, USER, PASSWORD);
       } catch (SQLException e) {
           semaphore.release();
           throw e;
       }
   }

   public static void releaseConnection(Connection connection) {
       if (connection != null) {
           try {
               connection.close();
           } catch (SQLException e) {
               e.printStackTrace();
           } finally {
               semaphore.release();
           }
       }
   }

   public static void main(String[] args) {

       ExecutorService executorService = Executors.newFixedThreadPool(20);

       for (int i = 0; i < 20; i++) {
           executorService.submit(() -> {
               try {
                   Connection connection = getConnection();
                   // 执行数据库操作
                   System.out.println(Thread.currentThread().getName() + " 获得连接,执行操作");
                   releaseConnection(connection);
               } catch (InterruptedException | SQLException e) {
                   e.printStackTrace();
               }
           });
       }
       executorService.shutdown();
   }
}

在上述代码中,Semaphore被用来限制同时获取数据库连接的线程数量为 10,有效保护了数据库连接资源,避免了因过多线程并发访问数据库而导致的性能问题。

CountDownLatch:闭锁

1. CountDownLatch 初相识

CountDownLatch 是 Java 并发包java.util.concurrent中的一个同步辅助类,它的主要作用是允许一个或多个线程等待,直到其他线程完成一组操作。可以把它想象成一个倒计时器,在初始化时设置一个计数值,每有一个线程完成任务,就将计数值减 1,当计数值减为 0 时,等待的线程就会被释放,继续执行后续操作 。例如,在一场多人比赛中,只有当所有选手都到达终点后,裁判才会宣布比赛结束,这里就可以用 CountDownLatch 来实现这种同步机制。

2. 核心原理揭秘

CountDownLatch 基于 AQS 实现,它内部维护了一个继承自 AQS 的Sync类 。在 CountDownLatch 创建时,传入的计数值会被赋值给 AQS 的state变量,作为计数器的初始值。当一个线程调用await()方法时,它会尝试获取共享锁,实际上是检查state是否为 0 。如果state不为 0,说明还有线程未完成任务,该线程会被封装成一个Node节点加入到 AQS 的等待队列中,进入阻塞状态;如果state为 0,则获取共享锁成功,线程可以继续执行。

当线程调用countDown()方法时,会释放共享锁,即通过compareAndSetState方法以 CAS 操作将state的值减 1 。如果减 1 后state变为 0,说明所有线程都已完成任务,此时会唤醒 AQS 等待队列中所有处于阻塞状态的线程。例如,假设有 5 个线程,创建 CountDownLatch 时计数值为 3,当 3 个线程依次调用countDown()方法后,state变为 0,在await()方法中等待的线程就会被唤醒。

3. 代码示例与场景应用

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

   public static void main(String[] args) throws InterruptedException {

       int threadCount = 3;
       CountDownLatch latch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           new Thread(new Worker(latch, i)).start();
       }
       System.out.println("主线程等待所有子线程完成任务...");
       latch.await();
       System.out.println("所有子线程任务已完成,主线程继续执行");
   }
}

class Worker implements Runnable {

   private final CountDownLatch latch;
   private final int id;

   public Worker(CountDownLatch latch, int id) {
       this.latch = latch;
       this.id = id;
   }

   @Override
   public void run() {
       try {
           System.out.println("线程 " + id + " 开始执行任务");
           Thread.sleep((long) (Math.random() * 1000));
           System.out.println("线程 " + id + " 任务执行完毕");
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           latch.countDown();
       }
   }
}

在上述代码中,主线程创建了一个计数值为 3 的 CountDownLatch,并启动了 3 个子线程。每个子线程在执行完任务后调用countDown()方法,主线程调用await()方法等待所有子线程完成任务。当所有子线程都调用了countDown()方法后,计数值变为 0,主线程被唤醒,继续执行后续代码 。在实际应用中,CountDownLatch 常用于多个任务并行执行,主线程需要等待所有任务完成后再进行汇总或后续处理的场景,比如在电商系统中,获取商品详情时,需要从多个不同的服务获取商品的基本信息、价格信息、库存信息等,只有当所有信息都获取完成后,才能组装完整的商品详情返回给用户,此时就可以使用 CountDownLatch 来实现这种同步等待的逻辑。

CyclicBarrier:循环栅栏

1. CyclicBarrier 是什么

CyclicBarrier 是 Java 并发包java.util.concurrent中的一个同步工具类,可翻译为循环栅栏 。它的作用是让一组线程在到达某个屏障点时相互等待,直到所有线程都到达该屏障点后,这些线程才会继续执行后续操作 。并且,当所有线程都通过屏障后,CyclicBarrier 可以被重置并再次使用,这也是它被称为 “循环” 栅栏的原因。例如,在一场接力比赛中,每个接力点就相当于一个屏障点,每个运动员(线程)需要跑到接力点等待其他运动员都到达后,才能开始下一棒的比赛 。

2. 工作机制解析

CyclicBarrier 内部主要通过ReentrantLockCondition来实现线程的阻塞和唤醒。它有一个计数器count,初始值为构造函数中传入的parties,表示需要到达屏障点的线程数量 。当一个线程调用await()方法时,它会首先获取ReentrantLock锁,然后将count减 1 。如果count不为 0,说明还有其他线程未到达屏障点,该线程会调用Conditionawait()方法进入等待状态,并释放锁;如果count为 0,说明所有线程都已到达屏障点,此时会执行以下操作:

执行构造函数中传入的barrierCommand(如果有的话),这个barrierCommand会在所有线程到达屏障点后,由最后一个到达的线程执行。

通过ConditionsignalAll()方法唤醒所有在等待的线程。

count重置为parties,并创建一个新的Generation对象,表示进入了新的一轮循环 。

当线程从await()方法返回时,说明它已经通过了屏障点,可以继续执行后续代码 。如果在等待过程中发生了以下情况,线程会抛出相应的异常:

如果当前线程被中断,会抛出InterruptedException,并且会调用breakBarrier()方法将CyclicBarrier的状态设置为 “损坏”,即brokentrue,同时唤醒所有等待的线程 。

如果等待超时(使用带超时参数的await()方法时),会抛出TimeoutException,同样会调用breakBarrier()方法将CyclicBarrier的状态设置为 “损坏” 。

如果在等待过程中发现CyclicBarrier已经处于 “损坏” 状态(即brokentrue),会抛出BrokenBarrierException

3. 与 CountDownLatch 的区别

功能不同:CountDownLatch 主要用于一个或多个线程等待其他线程完成一组操作,它的计数器是递减的,且只能使用一次;而 CyclicBarrier 是让一组线程相互等待,直到所有线程都到达屏障点,它的计数器可以重置,能够循环使用 。

使用场景不同:CountDownLatch 常用于主线程等待子线程完成任务后再进行汇总或后续处理的场景,如多个任务并行执行,主线程等待所有任务完成后进行结果合并;CyclicBarrier 则更适用于需要多线程协同工作,在某个阶段等待所有线程都准备好后再一起继续执行的场景,如多个线程共同处理一个复杂任务,每个线程完成自己负责的部分后,等待其他线程也完成,然后一起进行下一步处理 。

实现原理不同:CountDownLatch 基于 AQS 的共享模式实现,通过state变量来计数,当state减为 0 时唤醒等待的线程;CyclicBarrier 则是借助ReentrantLockCondition来实现线程的阻塞和唤醒,通过内部的计数器count来控制线程的等待和继续执行 。

ReentrantReadWriteLock:读写锁的高效之道

1. 读写锁简介

在多线程编程中,读操作和写操作对共享资源的访问具有不同的特性。读操作不会修改共享资源,因此多个线程同时进行读操作不会产生数据一致性问题;而写操作会修改共享资源,为了保证数据的一致性,写操作必须是排他的,即同一时刻只能有一个线程进行写操作 。ReentrantReadWriteLock 正是基于这样的背景应运而生,它位于java.util.concurrent.locks包下 。

ReentrantReadWriteLock 提供了一种读写分离的锁机制,读锁(ReadLock)可以被多个线程同时获取,实现了读操作的共享;写锁(WriteLock)则是排他的,同一时刻只有一个线程能够获取写锁 。这种机制在实际开发中非常有用,特别是在那些读操作远远多于写操作的场景中,能够显著提高系统的并发性能 。例如,在一个电商系统中,商品详情页面的展示是一个读操作频繁的场景,而商品库存的更新则是写操作,使用 ReentrantReadWriteLock 可以让大量用户同时读取商品详情,而只有在更新库存时才会进行写锁的独占操作 。

2. 原理深度解析

ReentrantReadWriteLock 基于 AQS 实现,它巧妙地利用了 AQS 的state变量来同时管理读锁和写锁的状态。state是一个 32 位的整数,ReentrantReadWriteLock 将其高 16 位用于表示读锁的计数,低 16 位用于表示写锁的计数 。

具体来说,定义了以下几个关键常量:

SHARED_SHIFT = 16:表示读锁计数的位移量,通过将state右移 16 位,可以得到读锁的计数值。

SHARED_UNIT = (1 << SHARED_SHIFT):读锁计数的单位,即每次增加一个读锁时,state需要增加的值。

MAX_COUNT = (1 << SHARED_SHIFT) - 1:读锁和写锁计数的最大值,防止计数溢出。

EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1:用于提取写锁计数的掩码,通过state与该掩码进行与操作,可以得到写锁的计数值。

在获取写锁时,线程会调用writeLock().lock()方法 。该方法会首先尝试获取写锁,即调用tryAcquire方法 。如果当前没有线程持有读锁和写锁(state为 0),或者当前线程已经持有写锁(可重入情况),则获取写锁成功,并将state的低 16 位增加相应的重入次数 。如果有其他线程持有读锁或写锁,则获取写锁失败,线程会被加入到 AQS 的等待队列中 。例如:

protected final boolean tryAcquire(int acquires) {

   Thread current = Thread.currentThread();
   int c = getState();
   int w = exclusiveCount(c);

   if (c != 0) {
       if (w == 0 || current != getExclusiveOwnerThread())
           return false;
       if (w + exclusiveCount(acquires) > MAX_COUNT)
           throw new Error("Maximum lock count exceeded");
       setState(c + acquires);
       return true;
   }

   if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))
       return false;
   setExclusiveOwnerThread(current);
   return true;
}

在获取读锁时,线程调用readLock().lock()方法,进而调用tryAcquireShared方法 。如果当前没有线程持有写锁,或者当前线程就是持有写锁的线程(锁降级情况),并且读锁计数未达到最大值,则获取读锁成功,将state的高 16 位增加相应的计数 。如果有其他线程持有写锁,则获取读锁失败,线程加入等待队列 。例如:

protected final int tryAcquireShared(int unused) {

   Thread current = Thread.currentThread();
   int c = getState();
   if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
       return -1;
   int r = sharedCount(c);

   if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
       if (r == 0) {
           firstReader = current;
           firstReaderHoldCount = 1;
       } else if (firstReader == current) {
           firstReaderHoldCount++;
       } else {
           HoldCounter rh = cachedHoldCounter;
           if (rh == null || rh.tid != getThreadId(current))
               cachedHoldCounter = rh = readHolds.get();
           else if (rh.count == 0)
               readHolds.set(rh);
           rh.count++;
       }
       return 1;
   }
   return fullTryAcquireShared(current);
}

在释放写锁时,调用writeLock().unlock()方法,通过tryRelease方法将state的低 16 位减少相应的重入次数 。如果重入次数减为 0,则释放写锁,唤醒等待队列中的线程 。释放读锁时,调用readLock().unlock()方法,通过tryReleaseShared方法将state的高 16 位减少相应的计数 。如果读锁计数减为 0,并且没有线程持有写锁,则唤醒等待队列中的线程 。

3. 使用场景与示例

在实际开发中,ReentrantReadWriteLock 常用于读多写少的场景 。比如在一个缓存系统中,缓存数据的读取操作非常频繁,而缓存的更新操作相对较少 。我们可以使用 ReentrantReadWriteLock 来保护缓存数据,代码示例如下:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Cache {

   private final Map<String, Object> cache = new HashMap<>();
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

   public Object get(String key) {
       lock.readLock().lock();
       try {
           return cache.get(key);
       } finally {
           lock.readLock().unlock();
       }
   }

   public void put(String key, Object value) {

       lock.writeLock().lock();

       try {
           cache.put(key, value);
       } finally {
           lock.writeLock().unlock();
       }
   }
}

在上述代码中,get方法用于读取缓存数据,使用读锁,允许多个线程同时读取;put方法用于更新缓存数据,使用写锁,保证同一时刻只有一个线程能够更新缓存,从而避免了数据不一致的问题 。在一个高并发的 Web 应用中,数据库连接池的配置信息也可以使用 ReentrantReadWriteLock 来管理,读操作时多个线程可以同时获取配置信息,写操作时则保证只有一个线程能够修改配置,确保了配置信息的一致性和系统的稳定性 。

总结与展望

Semaphore、CountDownLatch、CyclicBarrier 和 ReentrantReadWriteLock 作为基于 AQS 构建的重要同步工具,各自具备独特的功能和应用场景。Semaphore 通过控制许可数量来管理资源的并发访问,常用于限流和资源池的实现;CountDownLatch 允许线程等待其他线程完成一组操作,在任务汇总和线程启动协调等场景中发挥关键作用;CyclicBarrier 实现线程在屏障点的相互等待,可循环使用的特性使其适用于多线程的阶段性同步任务;ReentrantReadWriteLock 提供读写分离的锁机制,极大地提升了读多写少场景下的并发性能。

在实际的 Java 开发中,尤其是在构建高并发、高性能的系统时,深入理解并灵活运用这些同步工具至关重要。比如在分布式系统中,通过 Semaphore 控制对共享资源的访问频率,利用 CountDownLatch 协调不同服务之间的初始化顺序,借助 CyclicBarrier 实现多个节点在某个阶段的同步协作,运用 ReentrantReadWriteLock 优化缓存数据的读写操作等 。随着技术的不断发展,并发编程的场景和需求也日益复杂多样,未来我们可以进一步探索这些工具在不同场景下的优化组合使用,以及结合其他新技术(如响应式编程、异步 I/O 等)来提升系统的整体性能和稳定性,为构建更加高效、可靠的软件系统奠定坚实的基础。

文章作者: Z
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 微博客
并发编程
喜欢就支持一下吧