PriorityBlockingQueue 基础认知

定义与特点

PriorityBlockingQueue 是 Java 并发包中一个支持优先级排序的无界阻塞队列,实现了 BlockingQueue 接口,其内部元素默认会按照自然顺序(元素实现 Comparable 接口)或者通过提供的 Comparator 进行排序 ,这使得队列头部总是优先级最高的元素。PriorityBlockingQueue 的特性主要体现在以下几个方面:

  • 线程安全:使用可重入锁(ReentrantLock)来保证多线程环境下的并发安全性,允许多个线程同时进行插入和删除操作,而不会出现数据不一致的问题。

  • 优先级排序:队列中的元素根据其优先级进行排序,这一特性使得在处理任务或消息时,可以确保首先处理优先级最高的项。元素的优先级可以通过实现 Comparable 接口来定义自然顺序,也可以在创建 PriorityBlockingQueue 时传入一个 Comparator 来定制排序规则。

  • 无界队列:它是无界的,理论上可以存储无限个元素(仅受限于系统内存),这意味着在向队列中添加元素时,不会因为队列已满而抛出异常,也不需要像有界队列那样进行复杂的容量管理。

  • 阻塞操作:支持阻塞的插入和移除操作。当队列为空时,尝试获取元素的操作(如 take 方法)会被阻塞,直到队列中有元素可用;虽然是无界队列,但在某些情况下(如系统资源耗尽),插入操作也可能会因为无法分配内存而出现阻塞或者抛出 OutOfMemoryError 异常 。

与其他队列对比

在 Java 的集合框架中,存在多种类型的队列,与 PriorityBlockingQueue 经常被对比的有 ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityQueue,它们各自有着不同的特点和适用场景。

  • 与 ArrayBlockingQueue 对比:ArrayBlockingQueue 是一个由数组支持的有界阻塞队列,在创建时需要指定容量,一旦创建,容量不可改变。它按照 FIFO(先进先出)的顺序对元素进行排序,不支持优先级排序。而 PriorityBlockingQueue 是无界的,且支持根据元素优先级排序,适用于需要处理大量元素且对元素优先级有要求的场景。

  • 与 LinkedBlockingQueue 对比:LinkedBlockingQueue 是一个基于链表的可选有界阻塞队列,默认情况下是无界的(最大容量为 Integer.MAX_VALUE)。同样按照 FIFO 顺序处理元素,不支持优先级排序。PriorityBlockingQueue 与之不同的是,它基于堆数据结构实现优先级排序,能满足对元素优先级有要求的并发场景。

  • 与 PriorityQueue 对比:PriorityQueue 是一个非线程安全的优先级队列,它按照元素的自然顺序或自定义的比较器进行排序。PriorityBlockingQueue 则是线程安全的,适合在多线程环境中使用,通过使用锁机制来保证线程安全,使得多个线程可以安全地对队列进行操作。

应用场景

PriorityBlockingQueue 在许多实际应用场景中都发挥着重要作用,以下是一些常见的应用场景:

  • 任务调度:在多任务处理系统中,不同的任务可能具有不同的优先级,PriorityBlockingQueue 可以根据任务的优先级来调度执行。例如,在我们系统中创建的数据导出任务,可以设置任务的优先级,优先级高的优先导出处理。

  • 资源分配:在资源有限的情况下,需要根据一定的优先级规则来分配资源。比如在网络通信中,对于不同优先级的用户请求(如 VIP 用户与普通用户),可以使用 PriorityBlockingQueue 来管理请求队列,优先为高优先级的用户分配网络带宽、服务器资源等,从而保证高优先级用户获得更好的服务质量。

  • 事件处理系统:在处理不同类型的事件时,有些事件可能更为紧急或重要,需要优先处理。例如,在一个实时监控系统中,对于设备故障事件、安全警报事件等重要事件,给予较高的优先级,使其能够在队列中优先被处理,确保及时响应和解决问题,而一般性的信息事件则按照较低优先级处理。

  • 工作流引擎:在工作流管理系统中,任务通常根据其依赖性和优先级被安排执行顺序。PriorityBlockingQueue 可以用于存储和管理工作流中的任务,确保具有较高优先级和前置条件已满足的任务优先执行,从而保证工作流的高效运行。

源码结构与核心属性

以下是jdk_1.8.0_161版本中的部分源码

类图结构

PriorityBlockingQueue 的类图结构清晰地展示了其继承体系和实现的接口,这对于深入理解它的功能和特性至关重要,其类图如下:

从类图中可以看出,PriorityBlockingQueue 继承自 AbstractQueue 类,这使得它拥有了 AbstractQueue 类中定义的一些基本队列操作的默认实现,如addofferremovepoll等方法的基本逻辑 。同时,它实现了 BlockingQueue 接口,这意味着它必须实现该接口中定义的所有方法,从而具备了阻塞队列的特性,如puttake等方法,这些方法在多线程环境下提供了线程安全的阻塞操作,确保在队列状态不满足操作条件时,线程能够正确地阻塞和唤醒。此外,PriorityBlockingQueue 还实现了 java.io.Serializable 接口,表明它支持序列化,能够将队列的状态保存到文件或者在网络中传输 。

核心属性

PriorityBlockingQueue 的功能实现离不开其内部的一系列核心属性,这些属性各自承担着重要的职责,共同支撑着队列的正常运行。

  • DEFAULT_INITIAL_CAPACITY(默认初始容量)

    /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

这是 PriorityBlockingQueue 的默认初始容量,当创建队列时未指定初始容量,就会使用这个默认值。这个值是经过精心选择的,在大多数情况下能够在性能和内存占用之间取得较好的平衡,既不会因为初始容量过小而频繁扩容影响性能,也不会因为初始容量过大而浪费内存资源。

  • MAX_ARRAY_SIZE(数组最大容量)

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

定义了 PriorityBlockingQueue 内部用于存储元素的数组的最大容量。之所以是Integer.MAX_VALUE - 8,是为了避免在某些虚拟机实现中,由于数组头信息等额外开销导致内存分配失败,通过减去 8 来预留一定的空间,以确保在各种环境下都能安全地进行数组的扩容操作。

  • queue(存储元素的数组)

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

这是 PriorityBlockingQueue 的核心数据结构,用于实际存储队列中的元素。它被声明为transient,表示在对象进行序列化时,该属性不会被自动保存,这是因为在反序列化时,会重新构建队列的状态,而不是简单地恢复数组的内容,以保证队列的一致性和正确性。

  • size(元素个数)

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

记录了当前队列中实际存储的元素个数。这个属性在队列的各种操作中起着关键作用,例如在判断队列是否为空、进行元素添加和删除操作时,都需要依赖size的值来确定队列的状态和执行相应的逻辑。

  • comparator(比较器)

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

用于定义队列中元素的比较规则,决定元素的优先级顺序。如果comparatornull,则元素会使用其自身实现的Comparable接口的compareTo方法来进行自然排序;如果comparator不为null,则会使用传入的比较器的compare方法来进行排序,这使得用户可以根据具体需求自定义元素的优先级比较逻辑 。

  • lock(可重入锁)

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

是一个可重入锁,用于保证 PriorityBlockingQueue 在多线程环境下的操作的线程安全性。所有对队列的修改操作(如添加、删除元素)和部分读取操作(如take方法)都需要先获取这个锁,确保同一时间只有一个线程能够对队列进行操作,避免了数据竞争和不一致的问题。

  • notEmpty(条件变量)

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;

lock配合使用的条件变量,主要用于实现take方法的阻塞逻辑。当队列中没有元素时,调用take方法的线程会在notEmpty条件变量上等待,直到有其他线程向队列中添加元素并通过notEmpty.signal()notEmpty.signalAll()方法唤醒等待的线程。

  • allocationSpinLock(自旋锁)

    /**
     * Spinlock for allocation, acquired via CAS.
     */
    private transient volatile int allocationSpinLock;

是一个自旋锁,主要用于控制队列的扩容操作。在进行扩容时,通过 CAS(Compare - And - Swap)操作来尝试获取这个自旋锁,如果获取成功,则进行扩容操作;如果获取失败,说明有其他线程正在进行扩容,当前线程会通过Thread.yield()方法让出 CPU 执行权,稍后再尝试获取锁进行扩容,以此来避免多个线程同时进行扩容操作导致的不一致问题 。

核心方法源码解析

插入方法(offer、put)

offer 方法

offer方法是 PriorityBlockingQueue 中用于插入元素的核心方法之一,其实现逻辑精妙地融合了线程安全、容量管理和堆结构维护等多方面的考量,具体源码如下:

public boolean offer(E e) {
    // 禁止插入 null 元素(优先级队列不允许 null,因排序时无法比较 null)
    if (e == null)
        throw new NullPointerException();
    // 获取独占锁,保证线程安全(所有修改操作需加锁)。
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 若当前元素数量 n 达到数组容量 cap,调用 tryGrow 扩容
    // tryGrow 会释放锁以允许其他线程操作,扩容后重新加锁检查(通过 while 循环)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // siftUpComparable 将新元素插入数组末尾(索引 n),并通过 上浮操作(sift-up)调整堆结构,确保最小元素位于堆顶(小顶堆)。
        // 若未指定 comparator,元素必须实现 Comparable 接口,调用 siftUpComparable。
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
        // 若指定 comparator,调用 siftUpUsingComparator。
            siftUpUsingComparator(n, e, array, cmp);
        // 更新队列中元素数量
        size = n + 1;
        // 唤醒等待在 notEmpty 条件上的线程(如调用 take() 的线程)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

方法首先对插入元素e进行非空检查,若enull,则直接抛出NullPointerException,以确保队列中不会出现空元素,这是保证队列正常工作和后续操作安全的重要前提。

接着,获取可重入锁lock,这一步至关重要,它保证了在多线程环境下,对队列的操作是线程安全的,同一时间只有一个线程能够进入临界区,执行插入操作,避免了数据竞争和不一致的问题。

然后进入一个while循环,该循环用于检查队列当前元素个数n是否大于或等于队列底层数组的容量cap。如果是,则调用tryGrow方法进行扩容操作。tryGrow 是 PriorityBlockingQueue 实现无界特性的核心方法,用于在队列满时动态扩容。这个方法的设计精妙之处在于释放全局锁后进行扩容操作,从而允许其他线程在扩容期间继续执行出队操作,极大提升了并发性能。

tryGrow方法的实现如下:

private void tryGrow(Object[] array, int oldCap) {
    // 释放独占锁 lock,允许其他线程继续执行出队(poll/take)操作,避免扩容时阻塞整个队列。
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // allocationSpinLock 一个 int 类型的标志位(0 = 未锁定,1 = 已锁定),使用 Unsafe 的 CAS 操作实现轻量级锁。
    // 多个线程可能同时检测到队列需要扩容,通过 CAS 确保只有一个线程能执行实际扩容,其他线程会在后续 Thread.yield() 让步
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 当原容量 < 64 时,新容量为 oldCap + oldCap + 2(近似翻倍)
            // 当原容量 >= 64 时,新容量为 oldCap + oldCap/2(增加 50%)
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 若新容量超过 MAX_ARRAY_SIZE,尝试设置为最小需求容量 oldCap + 1
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
            // 若 oldCap + 1 仍溢出或超过限制,抛出 OutOfMemoryError
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 创建新数组
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 释放cas锁
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 获取全局锁,确保数据复制的原子性。
    lock.lock();
    // 进行二次检查
    // newArray != null:确认当前线程成功创建了新数组
    // queue == array:确认在释放锁期间,队列底层数组未被其他线程修改。
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow方法中,首先释放当前持有的锁lock,这是为了在扩容期间,其他线程能够继续进行出队等操作,提高系统的并发性能。然后通过 CAS(Compare - And - Swap)操作,使用allocationSpinLock自旋锁来尝试获取扩容权限。只有当allocationSpinLock为 0,并且通过 CAS 操作成功将其值从 0 改为 1 时,当前线程才获得扩容权限。接着根据当前数组容量oldCap计算新的容量newCap,如果oldCap小于 64,则新容量为oldCap + oldCap + 2;否则新容量为oldCap的 1.5 倍 。同时,会检查新容量是否超过了最大数组容量MAX_ARRAY_SIZE,若超过则进行相应的处理,避免内存溢出。如果计算得到的新容量newCap大于oldCap,并且当前队列的数组引用queue仍然指向原来的数组array,则创建一个新的数组newArray。最后,将allocationSpinLock重置为 0,表示扩容操作结束。如果由于其他线程竞争,导致当前线程未能成功获取扩容权限(newArraynull),则当前线程调用Thread.yield()方法让出 CPU 执行权,稍后再次尝试获取锁并进行扩容。当成功创建新数组后,重新获取锁lock,将队列的数组引用queue指向新数组newArray,并将原数组array中的元素复制到新数组中。

回到offer方法,在确保队列有足够容量后,根据是否设置了比较器comparator来决定如何将元素插入堆中。如果comparatornull,则调用siftUpComparable方法,利用元素自身实现的Comparable接口进行比较和插入操作;如果comparator不为null,则调用siftUpUsingComparator方法,使用自定义的比较器进行比较和插入操作。这两个方法的核心逻辑都是将新元素e插入到堆中合适的位置,通过不断比较新元素与父节点的大小,将新元素上浮,直到满足堆的性质。以siftUpComparable方法为例,其实现如下:

// 当通过 offer/put 向队列插入元素时,新元素会被暂存到堆的末尾(索引 k),
// 需要通过上浮操作将其移动到合适位置,以维护小顶堆的性质(父节点 ≤ 子节点)。
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

siftUpComparable方法中,首先将新元素x转换为Comparable类型的key。然后通过一个while循环,从插入位置k开始,不断向上比较新元素key与父节点e的大小。计算父节点的索引parent(k - 1) >>> 1,如果新元素key大于或等于父节点e,则说明新元素已经找到了合适的位置,退出循环;否则,将父节点e下移到当前位置k,并将k更新为父节点的索引parent,继续向上比较。直到新元素key大于或等于某个父节点,或者到达根节点时,将新元素key插入到当前位置k,完成插入操作。

siftUpComparable 实现了高效的堆结构维护,确保每次插入元素后都能快速恢复优先级顺序,这是其支持优先级排序的核心机制。

最后,更新队列的元素个数size,并通过notEmpty.signal()方法唤醒一个因调用take方法而被阻塞的线程(如果有),表示队列中已经有新元素可供取出。在所有操作完成后,释放锁lock,确保其他线程能够获取锁并对队列进行操作。

put 方法

put方法的实现非常简洁,它直接调用了offer方法,源码如下:

public void put(E e) {
    offer(e); // never need to block
}

这是因为 PriorityBlockingQueue 是无界队列,理论上插入操作永远不会因为队列满而阻塞,所以put方法和offer方法在功能上是一致的,都用于将元素插入队列。put方法的存在主要是为了满足BlockingQueue接口的定义,使得在使用BlockingQueue的统一接口时,开发者可以使用put方法进行插入操作,而不必关心队列是否有界,提供了更一致的编程体验 。

移除方法(poll、take)

poll 方法

poll方法用于获取并移除队列中优先级最高的元素,其实现逻辑涉及到堆结构的调整和元素的移除操作,具体源码如下:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

poll方法首先获取可重入锁lock,保证多线程环境下操作的线程安全性。然后调用dequeue方法来实际执行获取并移除元素的操作,在操作完成后,释放锁lock,允许其他线程对队列进行操作。

dequeue方法的实现如下:

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        // 取数组最后一个元素
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 将数组最后一个元素放在堆顶,然后进行下沉。
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

dequeue方法中,首先获取队列中当前元素个数减 1 的索引n。如果n小于 0,说明队列为空,直接返回null。否则,获取队列底层数组array,将数组第一个元素(即优先级最高的元素)赋值给result,作为返回值。接着,将数组最后一个元素x移动到数组第一个位置,然后将数组最后一个位置置为null,这样就完成了元素的移除操作。之后,根据是否设置了比较器comparator来决定如何调整堆结构。如果comparatornull,则调用siftDownComparable方法,利用元素自身实现的Comparable接口进行比较和堆调整操作;如果comparator不为null,则调用siftDownUsingComparator方法,使用自定义的比较器进行比较和堆调整操作。这两个方法的核心逻辑都是将新的堆顶元素x下沉到合适的位置,通过不断比较新元素与子节点的大小,将新元素下沉,直到满足堆的性质。以siftDownComparable方法为例,其实现如下:

private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

siftDownComparable方法中,首先将新元素x转换为Comparable类型的key,并计算出堆的一半大小half。然后通过一个while循环,从堆顶位置k开始,不断向下比较新元素key与子节点c的大小。计算左子节点的索引child(k << 1) + 1,如果右子节点存在(right < n)且右子节点的值小于左子节点的值,则将c更新为右子节点。如果新元素key小于或等于子节点c,则说明新元素已经找到了合适的位置,退出循环;否则,将子节点c上移到当前位置k,并将k更新为子节点的索引child,继续向下比较。直到新元素key小于或等于某个子节点,或者到达叶子节点时,将新元素key插入到当前位置k,完成堆结构的调整。最后,更新队列的元素个数size,并返回移除的元素result

take 方法

take方法同样用于获取并移除队列中优先级最高的元素,但与poll方法不同的是,当队列为空时,take方法会阻塞当前线程,直到队列中有元素可用,其源码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

take方法首先通过lock.lockInterruptibly()获取可重入锁,并且支持响应中断。这意味着在获取锁的过程中,如果当前线程被中断,会抛出InterruptedException异常。接着进入一个while循环,调用dequeue方法尝试获取并移除元素。如果dequeue方法返回null,说明队列为空,此时调用notEmpty.await()方法,使当前线程在notEmpty条件变量上等待。当其他线程向队列中添加元素后,会通过notEmpty.signal()notEmpty.signalAll()方法唤醒等待的线程。被唤醒的线程会重新尝试获取锁,并再次调用dequeue方法,直到成功获取到元素。在获取到元素或者捕获到中断异常后,最终释放锁lock,并返回获取到的元素result。这种阻塞机制确保了在多线程环境下,当队列中没有元素时,获取元素的线程能够正确地等待,避免了无效的轮询,提高了系统的资源利用率和性能。

其他方法(peek、size 等)

1. peek 方法

peek方法用于获取队列头部元素(即优先级最高的元素),但不移除该元素,其实现相对简单,源码如下:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
        lock.unlock();
    }
}

peek方法首先获取可重入锁lock,保证多线程环境下操作的线程安全性。然后判断队列的元素个数size是否为 0,如果为 0,说明队列为空,直接返回null;否则,返回队列底层数组queue的第一个元素,即优先级最高的元素。在操作完成后,释放锁lock,允许其他线程对队列进行操作。通过这种方式,peek方法能够在不改变队列状态的前提下,让调用者获取到队列中优先级最高的元素,方便了对队列元素的查看和处理。

2. size 方法

size方法用于返回队列中当前存储的元素个数,其实现直接返回内部记录元素个数的属性size,源码如下:

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

size方法中,首先获取可重入锁lock,确保在读取size属性时的线程安全性。然后直接返回size属性的值,该值在每次元素插入(offer方法中size = n + 1)和移除(dequeue方法中size = n)操作时都会被正确更新,准确地反映了队列中当前的元素数量。最后释放锁lock,使其他线程能够继续对队列进行操作。通过这种简单而直接的实现方式,size方法为开发者提供了一种高效获取队列元素个数的途径,方便在各种业务场景中对队列状态进行监控和处理 。

扩容机制剖析

扩容触发条件

PriorityBlockingQueue 的扩容机制设计精巧,旨在确保队列在高并发环境下能够高效地存储和管理元素。其扩容触发条件基于队列当前存储的元素个数与底层数组容量的比较。当通过offer等方法向队列中插入元素时,会执行如下代码:

while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);

这段代码清晰地表明,一旦队列中的元素个数n大于或等于底层数组的容量cap,就会触发扩容操作,调用tryGrow方法来扩大数组容量,以容纳更多的元素。这种触发条件的设计,既避免了频繁扩容带来的性能开销,又确保了队列在元素数量增加时能够及时调整容量,保证队列的正常运行。

扩容具体实现(tryGrow 方法)

tryGrow方法是 PriorityBlockingQueue 扩容的核心实现,具体逻辑见上面offer方法解析。

优先级实现原理

自然排序(Comparable 接口)

在 PriorityBlockingQueue 中,当元素实现了 Comparable 接口时,队列会依据元素的自然顺序来进行优先级排序,构建出一个有序的队列结构。这一过程基于二叉堆数据结构,其中最常用的是小顶堆(默认情况),在小顶堆中,每个节点的值都小于或等于其左右子节点的值,堆顶元素即为优先级最高的元素。

以一个简单的任务类Task为例,假设我们有如下定义:

class Task implements Comparable<Task> {

   private final int priority;

   private final String name;

   public Task(int priority, String name) {
       this.priority = priority;
       this.name = name;
   }

   @Override
   public int compareTo(Task other) {
       return Integer.compare(this.priority, other.priority);
   }
}

在这个Task类中,实现了Comparable接口,并在compareTo方法中定义了比较逻辑。通过Integer.compare(this.priority, other.priority),按照任务的优先级priority进行升序排序,即优先级数值越小,优先级越高。

当向 PriorityBlockingQueue 中插入Task对象时,会执行offer方法,

Comparator<? super E> cmp = comparator;
if (cmp == null)
    siftUpComparable(n, e, array);
else
    siftUpUsingComparator(n, e, array, cmp);

由于我们没有设置自定义的比较器comparatorcmp == null),所以会调用siftUpComparable方法来将新元素插入到堆中合适的位置。siftUpComparable方法的核心逻辑是从插入位置开始,将新元素与父节点进行比较,如果新元素小于父节点,则将父节点下移,新元素继续向上比较,直到找到合适的位置,满足堆的性质。这一过程确保了新插入的元素能够在堆中按照其自然顺序排列,从而保证整个队列的优先级顺序。

在移除元素时,poll方法会调用dequeue方法,其中涉及到堆结构的调整,

同样因为没有自定义比较器,会调用siftDownComparable方法。该方法从堆顶开始,将新的堆顶元素(即原堆的最后一个元素移到堆顶)与子节点进行比较,如果新元素大于子节点中的较小值,则将较小的子节点上移,新元素继续向下比较,直到满足堆的性质,这样可以保证每次移除堆顶元素(即优先级最高的元素)后,堆仍然保持有序,从而维持队列的优先级顺序。

自定义排序(Comparator 接口)

当元素本身没有实现 Comparable 接口,或者需要采用与自然顺序不同的排序规则时,PriorityBlockingQueue 允许在初始化时传入一个 Comparator 来实现自定义排序。Comparator 接口提供了一个compare方法,用于定义两个元素之间的比较逻辑,通过实现这个方法,可以根据具体需求定制元素的优先级顺序。

假设我们有一个商品类Product,如下所示:

class Product {

   private final String name;

   private final double price;

   public Product(String name, double price) {
       this.name = name;
       this.price = price;
   }
}

这个类没有实现 Comparable 接口,现在我们想要根据商品的价格从高到低进行优先级排序,可以创建一个自定义的比较器ProductPriceComparator

import java.util.Comparator;

class ProductPriceComparator implements Comparator<Product> {

   @Override
   public int compare(Product p1, Product p2) {
       return Double.compare(p2.price, p1.price);
   }
}

ProductPriceComparator中,compare方法通过Double.compare(p2.price, p1.price)实现了根据价格从高到低的排序逻辑,即价格越高的商品,优先级越高。

当使用这个自定义比较器来创建 PriorityBlockingQueue 时:

PriorityBlockingQueue<Product> productQueue = new PriorityBlockingQueue<>(11, new ProductPriceComparator());

向队列中插入元素时,offer方法同样会根据比较器来确定元素在堆中的位置。由于我们传入了自定义比较器,会执行siftUpUsingComparator方法:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

该方法会按照自定义比较器的规则,将新元素x插入到堆中合适的位置。从插入位置k开始,与父节点e进行比较,如果新元素x按照比较器规则不小于父节点e,则找到合适位置退出循环;否则将父节点e下移,新元素继续向上比较,确保堆中元素按照自定义的优先级顺序排列。

在移除元素时,poll方法调用的dequeue方法会执行siftDownUsingComparator方法:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

从堆顶位置k开始,将新的堆顶元素x与子节点c进行比较,如果新元素x按照比较器规则不大于子节点中的较小值,则找到合适位置退出循环;否则将较小的子节点c上移,新元素继续向下比较,保证每次移除堆顶元素后,堆仍然按照自定义的优先级顺序重新调整,使得队列始终保持正确的优先级排序。

使用注意事项与常见问题

线程安全问题

虽然 PriorityBlockingQueue 本身是线程安全的,但其内部使用了可重入锁(ReentrantLock)来保证多线程环境下的操作安全,这在一定程度上会带来性能开销。在高并发场景下,频繁的锁竞争可能会导致线程上下文切换频繁,从而降低系统的整体性能。因此,在实际应用中,需要根据具体的并发访问模式和性能需求来评估使用 PriorityBlockingQueue 是否合适。如果对性能要求极高,且业务场景允许,可以考虑使用一些无锁数据结构或更细粒度的并发控制机制。另外,在使用 PriorityBlockingQueue 时,要确保所有对队列的操作都通过其提供的方法进行,避免在队列外部进行可能破坏线程安全的操作。例如,直接访问队列内部的数组(queue属性)是不安全的,因为这绕过了队列的锁机制,可能导致数据不一致。同时,在多线程环境下,对队列的迭代操作也需要谨慎处理,因为 PriorityBlockingQueue 的迭代器不保证线程安全,在迭代过程中如果其他线程对队列进行修改(如添加或删除元素),可能会导致迭代结果不准确或抛出ConcurrentModificationException异常。

内存占用问题

由于 PriorityBlockingQueue 是无界队列,理论上可以存储无限个元素(仅受限于系统内存),这在带来便利性的同时,也可能引发内存占用过高的问题。当系统持续向队列中添加元素,而元素的处理速度较慢时,队列中的元素会不断累积,最终可能耗尽系统内存,导致 OutOfMemoryError 异常。为了避免这种情况的发生,需要密切监控队列的大小和系统内存使用情况。可以通过定期调用size方法获取队列当前元素个数,并结合系统的内存监控工具(如 Java VisualVM、JConsole 等)来实时观察内存的使用状况。在业务逻辑中,可以根据实际情况设置一些阈值,当队列大小超过某个阈值时,采取相应的措施,如调整任务处理速度、增加处理线程数、丢弃低优先级任务等。此外,对于长时间不需要的数据,应及时从队列中移除,以释放内存空间。例如,在任务调度系统中,如果某些任务因为超时或其他原因不再需要执行,应将其从 PriorityBlockingQueue 中移除,避免其占用不必要的内存资源 。

优先级冲突问题

当多个元素具有相同的优先级时,PriorityBlockingQueue 并不能保证它们的出队顺序。这是因为在基于堆的数据结构中,对于相同优先级的元素,其在堆中的位置是根据插入顺序和堆调整算法确定的,而出队操作是基于堆顶元素(即优先级最高的元素)进行的,对于同优先级元素,并没有严格的顺序保证。在一些对元素顺序要求严格的业务场景中,这种不确定性可能会导致问题。例如,在一个订单处理系统中,如果多个订单具有相同的优先级,按照业务逻辑,应该按照订单的提交时间先后顺序进行处理,但由于 PriorityBlockingQueue 的这一特性,可能无法保证订单的处理顺序与提交顺序一致。为了解决这个问题,可以在元素的优先级判断逻辑中加入额外的排序依据,比如时间戳、唯一 ID 等,使得具有相同优先级的元素可以按照其他维度进行排序。例如,对于订单类Order,可以在实现Comparable接口或自定义Comparator时,不仅考虑订单的优先级,还考虑订单的提交时间:

class Order implements Comparable<Order> {

   private final int priority;

   private final long submitTime;

   // 其他订单属性和方法
   public Order(int priority, long submitTime) {
       this.priority = priority;
       this.submitTime = submitTime;
   }


   @Override
   public int compareTo(Order other) {
       int priorityComparison = Integer.compare(this.priority, other.priority);
       if (priorityComparison != 0) {
           return priorityComparison;
       }
       // 优先级相同时,根据提交时间再次排序 
       return Long.compare(this.submitTime, other.submitTime);
   }
}

通过这种方式,当订单优先级相同时,会按照提交时间的先后顺序进行排序,从而在 PriorityBlockingQueue 中保证了相同优先级元素的有序性,满足业务对元素顺序的严格要求 。

总结

通过对 PriorityBlockingQueue 源码的深入剖析,我们全面地认识了它的原理、特性和实现机制。它基于优先级的排序特性,借助二叉堆数据结构,无论是通过元素自身实现的 Comparable 接口进行自然排序,还是利用自定义的 Comparator 接口实现灵活排序,都能确保元素按照优先级有序排列,这在任务调度、资源分配等众多场景中发挥着关键作用。其无界队列的设计,理论上可以存储无限个元素,为数据的持续流入提供了便利,同时采用可重入锁(ReentrantLock)来保证多线程环境下操作的线程安全性,并利用条件变量(Condition)实现了高效的线程阻塞和唤醒机制,有效提升了多线程环境下的并发性能。

文章作者: Z
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 微博客
并发编程
喜欢就支持一下吧