文章目录
- BlockingQueue
- BlockingDeque
- TransferQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- DelayQueue
- SynchronousQueue
BlockingQueue
BlockingQueue 是位于 java.util.concurrent 包中的一个接口,它扩展了 Queue 接口,提供了额外的阻塞特性。BlockingQueue 允许在多线程环境中安全地执行生产者-消费者模式,确保在并发访问时线程安全。
在JDK中有如下实现
主要特性
- 阻塞操作:
插入元素:当队列已满时,生产者线程会被阻塞,直到有空间可用。
获取元素:当队列为空时,消费者线程会被阻塞,直到有元素可用。 - 线程安全:所有的操作都是线程安全的,保证了多个线程同时访问队列时的数据一致性。
Java 提供了几种 BlockingQueue 的实现,包括:
- ArrayBlockingQueue:有界阻塞队列,使用数组实现。在创建时需要指定容量。适合有限的任务处理。
- LinkedBlockingQueue:也可以是有界或无界的,使用链表实现。默认情况下,无界队列适合处理大量任务。
- PriorityBlockingQueue:支持优先级的无界阻塞队列。需要提供一个比较器来确定元素的优先级。
- DelayQueue:支持延迟处理的阻塞队列。适合定时任务的调度。
- SynchronousQueue:不持有任何元素的阻塞队列,插入操作必须等待另一个线程进行提取操作。用于实现直接传递任务的场景。
常用方法
void put(E e)
:添加元素,如果队列满,线程阻塞直到有空间可用。boolean offer(E e, long timeout, TimeUnit unit)
:在指定时间内尝试添加元素,超时后返回 false。E take()
:提取元素,如果队列为空,线程阻塞直到有元素可用。E poll(long timeout, TimeUnit unit)
:在指定时间内尝试提取元素,超时后返回 null。int remainingCapacity()
:返回队列的剩余容量。boolean isEmpty()
:检查队列是否为空。
BlockingDeque
继承自BlockingQueue,BlockingQueue 的双端队列版本,支持在队列的两端进行插入和提取操作,并且这些操作都是阻塞的。BlockingDeque 的常用实现是LinkedBlockingDeque
,它是基于链表的无界双端阻塞队列,JDK 中没有基于数组的实现版本
主要特性
- 可以从队列的两端(头部和尾部)插入和提取元素。
- 阻塞特性:当队列满时,插入操作会阻塞;当队列为空时,提取操作会阻塞。
- 线程安全:所有操作都是线程安全的,适合在多线程环境中使用。
常用方法
- void putFirst(E e):在队列头部插入元素,若队列满则阻塞。
- void putLast(E e):在队列尾部插入元素,若队列满则阻塞。
- E takeFirst():提取并移除队列头部的元素,若队列为空则阻塞。
- E takeLast():提取并移除队列尾部的元素,若队列为空则阻塞。
- int remainingCapacity():返回队列的剩余容量。
- boolean isEmpty():检查队列是否为空。
TransferQueue
TransferQueue 扩展了 BlockingQueue 接口,支持直接转移元素。它主要用于实现生产者-消费者模式中更高效的任务传递。
主要特性
- 直接转移:TransferQueue 提供了 transfer 方法,允许生产者在没有消费者准备好接收元素时阻塞并等待。
- 非阻塞转移:tryTransfer 方法允许生产者尝试转移元素,如果没有消费者准备好,则返回 false。
- 阻塞特性:与 BlockingQueue 类似,当队列满时,插入操作会阻塞;当队列为空时,提取操作会阻塞。
LinkedTransferQueue 是 TransferQueue 的一个常用实现,采用链表结构,允许无界的队列操作,也没有数组实现版本
常用方法
- void transfer(E e):如果没有消费者准备好接收元素,当前线程会阻塞,直到有消费者调用 take 或 poll 方法。
- boolean tryTransfer(E e):尝试转移元素,如果没有消费者准备好,则返回 false。
标准操作: - void put(E e):将元素插入队列,如果队列已满则阻塞。
- E take():提取并移除队列头部的元素,如果队列为空则阻塞。
import java.util.concurrent.LinkedTransferQueue;public class TransferQueueExample {public static void main(String[] args) {LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {queue.transfer(i); // 直接转移元素System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {int value = queue.take(); // 提取元素System.out.println("Consumed: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();}
}
ArrayBlockingQueue
ArrayBlockingQueue 使用数组来存储元素,具有固定的大小。数组的索引用于管理队列的头部和尾部。内部使用了 ReentrantLock 和 Condition 来实现阻塞特性。
- put 方法会在队列满时调用 Condition 的 await() 方法,阻塞当前线程。
- take 方法会在队列为空时调用 Condition 的 await() 方法,阻塞当前线程。
ArrayBlockingQueue 采用循环数组的方式来存储元素,这样可以有效地利用数组空间并减少内存复制。使用头尾两个指针来跟踪队列的头部和尾部位置,确保在插入和提取元素时的高效性。
特性如下:
- 容量限制:ArrayBlockingQueue 是有界的,必须在创建时指定容量。如果尝试在队列已满时添加元素,线程将被阻塞。
- 线程安全:所有操作都是线程安全的,但在高并发环境下,尽量避免长时间占用队列,避免造成不必要的阻塞。
- 阻塞特性:使用 put 和 take 方法时,线程会阻塞。要确保在合适的情况下使用这些方法,以避免死锁或性能问题。
- 中断处理:在使用阻塞方法时,应适当地处理 InterruptedException,确保线程可以安全中断。
使用示例:
import java.util.concurrent.ArrayBlockingQueue;public class ArrayBlockingQueueExample {public static void main(String[] args) {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 1; i <= 10; i++) {queue.put(i);System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 10; i++) {int value = queue.take();System.out.println("Consumed: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();}
}
LinkedBlockingQueue
使用链表实现,可以是有界的或无界的。默认情况下是无界的,除非在创建时指定容量。
PriorityBlockingQueue
支持优先级的无界阻塞队列。需要提供一个比较器来定义元素的优先级。元素的提取顺序是根据优先级而非插入顺序。
PriorityBlockingQueue 基于最小堆(Min-Heap)实现。最小堆是一种完全二叉树,根节点是最小值,任何节点的值都不大于其子节点的值。这种结构使得最小值可以在 O(1) 的时间复杂度内被访问。
自然顺序或自定义比较器:当创建 PriorityBlockingQueue 时,可以传入一个 Comparator,确定元素的优先级。如果没有提供,则元素必须实现 Comparable 接口。
插入和提取操作
- offer 和 put 方法:当插入新元素时,PriorityBlockingQueue 会将其插入到堆中,并维护堆的特性,确保最小值(或最大值)始终在根节点。
- take 和 poll 方法:take 方法会从队列中提取并移除根节点(优先级最高的元素),并将堆重新调整以恢复堆的特性。这一过程称为“下沉”。
import java.util.concurrent.PriorityBlockingQueue;class Task implements Comparable<Task> {private final int priority;public Task(int priority) {this.priority = priority;}public int getPriority() {return priority;}@Overridepublic int compareTo(Task other) {return Integer.compare(this.priority, other.priority);}@Overridepublic String toString() {return "Task with priority: " + priority;}
}public class PriorityBlockingQueueExample {public static void main(String[] args) {PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();// 添加任务queue.offer(new Task(3));queue.offer(new Task(1));queue.offer(new Task(2));// 提取任务while (!queue.isEmpty()) {Task task = queue.poll();System.out.println("Processing " + task);}}
}
DelayQueue
DelayQueue 是 Java 并发包中的一个阻塞队列,属于 java.util.concurrent 包。它实现了 BlockingQueue 接口,主要用于在一定的延迟后处理元素。DelayQueue 中的元素必须实现 Delayed 接口,这样队列才能根据元素的延迟时间进行排序。
主要特性
- 队列中的元素在到达指定的延迟时间之前无法被取出
- 元素必须实现 Delayed 接口
- 当队列为空时,尝试获取元素的线程会被阻塞,直到有元素可用
- 根据元素的延迟时间自动排序,最早到期的元素优先被处理
下面来看一个案例
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;class DelayedTask implements Delayed {private final String name;private final long delayTime;private final long startTime; // 任务预期开始执行的时间public DelayedTask(String name, long delayTime) {this.name = name;this.delayTime = delayTime;this.startTime = System.currentTimeMillis() + delayTime;}@Overridepublic long getDelay(TimeUnit unit) {long diff = startTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "Task: " + name + ", Delay: " + delayTime + "ms";}
}public class DelayQueueExample {public static void main(String[] args) {DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();// 添加任务到队列delayQueue.add(new DelayedTask("Task 1", 3000)); // 3秒后delayQueue.add(new DelayedTask("Task 2", 1000)); // 1秒后delayQueue.add(new DelayedTask("Task 3", 2000)); // 2秒后// 处理任务try {while (!delayQueue.isEmpty()) {DelayedTask task = delayQueue.take(); // 阻塞直到有元素System.out.println("Processing " + task);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
DelayedTask 需要实现 Delayed 接口,定义延迟任务的基本属性和方法。
- getDelay:返回到期时间,即剩余的延迟时间
- compareTo:用于比较两个延迟任务的到期时间,进行排序
SynchronousQueue
不持有任何元素,每个插入操作必须等待一个提取操作。无法指定容量,因为它不会存储元素
SynchronousQueue 使用了 ReentrantLock 和 Condition 来管理线程之间的同步。
无缓冲区:由于不存储元素,它实际上是一个直接交付的队列,插入和提取操作是通过手动的线程交互进行的。
import java.util.concurrent.SynchronousQueue;public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<Integer> queue = new SynchronousQueue<>();// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {System.out.println("Producing: " + i);queue.put(i); // 直接交付}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {int value = queue.take(); // 直接接收System.out.println("Consuming: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();}
}