Java并发 - 阻塞队列详解

Java并发 - 阻塞队列详解

1. 阻塞队列概述

1.1 什么是阻塞队列

阻塞队列(BlockingQueue)是Java并发包中的一个重要组件,它是一个支持两个附加操作的队列:

  • 阻塞插入:当队列满时,插入元素的线程会被阻塞,直到队列不满
  • 阻塞移除:当队列空时,获取元素的线程会被阻塞,直到队列不空

1.2 阻塞队列的核心价值

1.2.1 解决的问题

  • 生产者-消费者问题:自动协调生产者和消费者的速度差异
  • 线程安全:内置同步机制,无需额外的同步代码
  • 流量控制:通过队列容量限制,防止内存溢出
  • 解耦合:生产者和消费者通过队列解耦,提高系统灵活性

1.2.2 应用场景

  • 线程池任务队列:存储待执行的任务
  • 消息中间件:异步消息传递
  • 数据缓冲:平衡不同速度的数据处理组件
  • 事件驱动系统:事件的缓存和分发

1.3 BlockingQueue接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface BlockingQueue<E> extends Queue<E> {
// 阻塞插入
void put(E e) throws InterruptedException;

// 阻塞获取
E take() throws InterruptedException;

// 超时插入
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

// 超时获取
E poll(long timeout, TimeUnit unit) throws InterruptedException;

// 剩余容量
int remainingCapacity();

// 移除指定元素
boolean remove(Object o);

// 批量操作
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

1.4 操作方法对比

操作类型 抛出异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不适用 不适用

方法说明

  • 抛出异常:操作失败时抛出异常
  • 返回特殊值:操作失败时返回false或null
  • 阻塞:操作失败时阻塞线程直到成功
  • 超时:操作失败时阻塞指定时间,超时后返回失败

2. 阻塞队列实现类详解

2.1 ArrayBlockingQueue

2.1.1 基本特性

ArrayBlockingQueue 是基于数组实现的有界阻塞队列,具有以下特点:

  • 数据结构:底层使用数组存储元素
  • 容量限制:创建时必须指定容量,且容量不可变
  • FIFO顺序:先进先出的元素顺序
  • 公平性:支持公平和非公平的访问策略
  • 线程安全:使用ReentrantLock保证线程安全

2.1.2 构造方法

1
2
3
4
5
6
7
8
// 指定容量,默认非公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity);

// 指定容量和公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair);

// 指定容量、公平策略和初始元素
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair, collection);

2.1.3 核心实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ArrayBlockingQueue<E> extends AbstractQueue<E> 
implements BlockingQueue<E>, java.io.Serializable {

// 存储元素的数组
final Object[] items;

// 取元素的索引
int takeIndex;

// 放元素的索引
int putIndex;

// 元素个数
int count;

// 主锁
final ReentrantLock lock;

// 等待取元素的条件
private final Condition notEmpty;

// 等待放元素的条件
private final Condition notFull;
}

2.1.4 适用场景

  • 固定容量需求:需要严格控制内存使用的场景
  • 生产消费平衡:生产者和消费者速度相对平衡
  • 线程池任务队列:固定大小的线程池
  • 缓冲区应用:网络IO缓冲、日志缓冲等

2.1.5 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueExample {

public static void main(String[] args) throws InterruptedException {
// 创建容量为5的公平队列
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(5, true);

// 启动生产者线程
Thread producer = new Thread(new Producer(queue), "Producer");

// 启动消费者线程
Thread consumer1 = new Thread(new Consumer(queue), "Consumer-1");
Thread consumer2 = new Thread(new Consumer(queue), "Consumer-2");

producer.start();
consumer1.start();
consumer2.start();

// 等待生产者完成
producer.join();

// 等待队列清空
Thread.sleep(2000);

// 中断消费者线程
consumer1.interrupt();
consumer2.interrupt();
}

static class Task {
private final int id;
private final String data;

public Task(int id, String data) {
this.id = id;
this.data = data;
}

@Override
public String toString() {
return "Task{id=" + id + ", data='" + data + "'}";
}
}

static class Producer implements Runnable {
private final BlockingQueue<Task> queue;

public Producer(BlockingQueue<Task> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
Task task = new Task(i, "Data-" + i);

// 使用put方法,队列满时会阻塞
queue.put(task);
System.out.println(Thread.currentThread().getName() +
" produced: " + task);

// 模拟生产耗时
Thread.sleep(100);
}
System.out.println("Producer finished");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Producer interrupted");
}
}
}

static class Consumer implements Runnable {
private final BlockingQueue<Task> queue;

public Consumer(BlockingQueue<Task> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
// 使用poll方法,超时返回null
Task task = queue.poll(1, TimeUnit.SECONDS);

if (task != null) {
System.out.println(Thread.currentThread().getName() +
" consumed: " + task);

// 模拟处理耗时
Thread.sleep(200);
} else {
System.out.println(Thread.currentThread().getName() +
" timeout, no task available");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + " interrupted");
}
}
}
}

2.1.6 性能特点

优势

  • 预分配数组,避免动态扩容开销
  • 使用索引操作,访问效率高
  • 支持公平策略,避免线程饥饿

劣势

  • 容量固定,无法动态调整
  • 数组预分配可能浪费内存
  • 单锁设计,高并发时可能成为瓶颈

2.2 LinkedBlockingQueue

2.2.1 基本特性

LinkedBlockingQueue 是基于链表实现的可选有界阻塞队列,具有以下特点:

  • 数据结构:底层使用单向链表存储元素
  • 容量限制:可选择有界或无界(默认Integer.MAX_VALUE)
  • FIFO顺序:先进先出的元素顺序
  • 双锁设计:读写操作使用不同的锁,提高并发性能
  • 动态扩容:根据需要动态创建节点

2.2.2 构造方法

1
2
3
4
5
6
7
8
// 无界队列,容量为Integer.MAX_VALUE
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>();

// 有界队列,指定容量
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(capacity);

// 使用集合初始化
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(collection);

2.2.3 核心实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

// 链表节点
static class Node<E> {
E item;
Node<E> next;

Node(E x) { item = x; }
}

// 容量限制
private final int capacity;

// 当前元素数量
private final AtomicInteger count = new AtomicInteger();

// 头节点
transient Node<E> head;

// 尾节点
private transient Node<E> last;

// 取元素锁
private final ReentrantLock takeLock = new ReentrantLock();

// 非空条件
private final Condition notEmpty = takeLock.newCondition();

// 放元素锁
private final ReentrantLock putLock = new ReentrantLock();

// 非满条件
private final Condition notFull = putLock.newCondition();
}

2.2.4 双锁机制优势

读写分离

  • putLock:控制入队操作,保护尾节点
  • takeLock:控制出队操作,保护头节点
  • 并发优势:读写可以同时进行,提高吞吐量

2.2.5 适用场景

  • 生产消费不平衡:生产者和消费者速度差异较大
  • 高并发场景:需要高吞吐量的并发环境
  • 线程池默认选择:ThreadPoolExecutor的默认队列
  • 消息缓冲:异步消息处理系统

2.2.6 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class LinkedBlockingQueueExample {

private static final AtomicInteger taskIdGenerator = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
// 创建有界队列,容量为100
BlockingQueue<Message> queue = new LinkedBlockingQueue<>(100);

// 创建多个生产者
Thread producer1 = new Thread(new MessageProducer(queue, "Producer-1"), "Producer-1");
Thread producer2 = new Thread(new MessageProducer(queue, "Producer-2"), "Producer-2");

// 创建多个消费者
Thread consumer1 = new Thread(new MessageConsumer(queue), "Consumer-1");
Thread consumer2 = new Thread(new MessageConsumer(queue), "Consumer-2");
Thread consumer3 = new Thread(new MessageConsumer(queue), "Consumer-3");

// 启动所有线程
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();

// 等待生产者完成
producer1.join();
producer2.join();

// 等待队列处理完成
Thread.sleep(3000);

// 停止消费者
consumer1.interrupt();
consumer2.interrupt();
consumer3.interrupt();

System.out.println("Final queue size: " + queue.size());
}

static class Message {
private final int id;
private final String content;
private final String producer;
private final long timestamp;

public Message(String content, String producer) {
this.id = taskIdGenerator.incrementAndGet();
this.content = content;
this.producer = producer;
this.timestamp = System.currentTimeMillis();
}

@Override
public String toString() {
return String.format("Message{id=%d, content='%s', producer='%s', timestamp=%d}",
id, content, producer, timestamp);
}

public int getId() { return id; }
public String getContent() { return content; }
public String getProducer() { return producer; }
public long getTimestamp() { return timestamp; }
}

static class MessageProducer implements Runnable {
private final BlockingQueue<Message> queue;
private final String producerName;

public MessageProducer(BlockingQueue<Message> queue, String producerName) {
this.queue = queue;
this.producerName = producerName;
}

@Override
public void run() {
try {
for (int i = 1; i <= 20; i++) {
Message message = new Message("Message-" + i, producerName);

// 使用offer方法,避免无限阻塞
boolean success = queue.offer(message, 2, TimeUnit.SECONDS);

if (success) {
System.out.println(producerName + " produced: " + message.getId());
} else {
System.out.println(producerName + " failed to produce: " + message.getId());
}

// 模拟生产间隔
Thread.sleep(50 + (int)(Math.random() * 100));
}
System.out.println(producerName + " finished producing");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(producerName + " interrupted");
}
}
}

static class MessageConsumer implements Runnable {
private final BlockingQueue<Message> queue;

public MessageConsumer(BlockingQueue<Message> queue) {
this.queue = queue;
}

@Override
public void run() {
String consumerName = Thread.currentThread().getName();

try {
while (!Thread.currentThread().isInterrupted()) {
// 使用poll方法,超时机制
Message message = queue.poll(1, TimeUnit.SECONDS);

if (message != null) {
// 模拟消息处理
processMessage(message, consumerName);
} else {
System.out.println(consumerName + " waiting for messages...");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(consumerName + " interrupted");
}
}

private void processMessage(Message message, String consumerName) throws InterruptedException {
System.out.println(String.format("%s processing message %d from %s",
consumerName, message.getId(), message.getProducer()));

// 模拟处理时间
Thread.sleep(100 + (int)(Math.random() * 200));

System.out.println(String.format("%s completed message %d",
consumerName, message.getId()));
}
}
}

2.2.7 性能特点

优势

  • 双锁设计,读写并发性能好
  • 动态扩容,内存使用灵活
  • 支持有界和无界模式
  • 适合高并发场景

劣势

  • 链表结构,内存开销相对较大
  • 无界模式可能导致内存溢出
  • 节点创建和GC开销

2.2.8 与ArrayBlockingQueue对比

特性 ArrayBlockingQueue LinkedBlockingQueue
数据结构 数组 链表
容量 固定有界 可选有界/无界
锁机制 单锁 双锁
内存使用 预分配 动态分配
并发性能 中等 较高
适用场景 固定容量 高并发

2.3 PriorityBlockingQueue

2.3.1 基本特性

PriorityBlockingQueue 是支持优先级排序的无界阻塞队列:

  • 数据结构:基于数组实现的二叉堆
  • 容量限制:无界队列,容量可动态扩展
  • 排序规则:支持自然排序或自定义Comparator
  • 线程安全:使用ReentrantLock保证线程安全
  • 阻塞特性:只有take操作会阻塞,put操作不会阻塞

2.3.2 适用场景

  • 任务调度系统:根据优先级执行任务
  • 事件处理:按重要性处理事件
  • 资源分配:优先分配给高优先级请求
  • 消息队列:VIP消息优先处理

2.3.3 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class PriorityBlockingQueueExample {

public static void main(String[] args) throws InterruptedException {
// 创建优先级队列
BlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();

// 启动生产者
Thread producer = new Thread(new TaskProducer(queue), "Producer");

// 启动消费者
Thread consumer = new Thread(new TaskConsumer(queue), "Consumer");

producer.start();
consumer.start();

producer.join();
Thread.sleep(2000);
consumer.interrupt();
}

static class PriorityTask implements Comparable<PriorityTask> {
private final int priority;
private final String taskName;
private final long createTime;

public PriorityTask(int priority, String taskName) {
this.priority = priority;
this.taskName = taskName;
this.createTime = System.currentTimeMillis();
}

@Override
public int compareTo(PriorityTask other) {
// 优先级高的排在前面(数字越小优先级越高)
int result = Integer.compare(this.priority, other.priority);
if (result == 0) {
// 优先级相同时,按创建时间排序
result = Long.compare(this.createTime, other.createTime);
}
return result;
}

@Override
public String toString() {
return String.format("Task{priority=%d, name='%s', createTime=%d}",
priority, taskName, createTime);
}

public int getPriority() { return priority; }
public String getTaskName() { return taskName; }
}

static class TaskProducer implements Runnable {
private final BlockingQueue<PriorityTask> queue;

public TaskProducer(BlockingQueue<PriorityTask> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
// 添加不同优先级的任务
queue.put(new PriorityTask(3, "Low Priority Task 1"));
queue.put(new PriorityTask(1, "High Priority Task 1"));
queue.put(new PriorityTask(2, "Medium Priority Task 1"));
queue.put(new PriorityTask(1, "High Priority Task 2"));
queue.put(new PriorityTask(3, "Low Priority Task 2"));
queue.put(new PriorityTask(2, "Medium Priority Task 2"));

System.out.println("All tasks produced");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

static class TaskConsumer implements Runnable {
private final BlockingQueue<PriorityTask> queue;

public TaskConsumer(BlockingQueue<PriorityTask> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
PriorityTask task = queue.poll(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println("Processing: " + task);
Thread.sleep(500); // 模拟处理时间
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Consumer interrupted");
}
}
}
}

2.4 DelayQueue

2.4.1 基本特性

DelayQueue 是支持延迟获取元素的无界阻塞队列:

  • 延迟特性:元素只有在延迟期满后才能被取出
  • 无界队列:容量无限制
  • 排序规则:按照延迟时间排序
  • 元素要求:元素必须实现Delayed接口

2.4.2 适用场景

  • 定时任务调度:延迟执行任务
  • 缓存过期:缓存元素的过期处理
  • 订单超时:订单超时自动取消
  • 重试机制:延迟重试失败的操作

2.4.3 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {

public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();

// 启动生产者
Thread producer = new Thread(new DelayedTaskProducer(queue), "Producer");

// 启动消费者
Thread consumer = new Thread(new DelayedTaskConsumer(queue), "Consumer");

producer.start();
consumer.start();

producer.join();
Thread.sleep(15000); // 等待所有延迟任务执行完成
consumer.interrupt();
}

static class DelayedTask implements Delayed {
private final String taskName;
private final long executeTime;

public DelayedTask(String taskName, long delayInSeconds) {
this.taskName = taskName;
this.executeTime = System.currentTimeMillis() + delayInSeconds * 1000;
}

@Override
public long getDelay(TimeUnit unit) {
long remaining = executeTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}

@Override
public String toString() {
return String.format("DelayedTask{name='%s', executeTime=%d, remaining=%d ms}",
taskName, executeTime, getDelay(TimeUnit.MILLISECONDS));
}

public String getTaskName() { return taskName; }
}

static class DelayedTaskProducer implements Runnable {
private final DelayQueue<DelayedTask> queue;

public DelayedTaskProducer(DelayQueue<DelayedTask> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
// 添加不同延迟时间的任务
queue.put(new DelayedTask("Task-5s", 5));
queue.put(new DelayedTask("Task-2s", 2));
queue.put(new DelayedTask("Task-8s", 8));
queue.put(new DelayedTask("Task-1s", 1));
queue.put(new DelayedTask("Task-3s", 3));

System.out.println("All delayed tasks added to queue");
} catch (Exception e) {
e.printStackTrace();
}
}
}

static class DelayedTaskConsumer implements Runnable {
private final DelayQueue<DelayedTask> queue;

public DelayedTaskConsumer(DelayQueue<DelayedTask> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
DelayedTask task = queue.take(); // 阻塞直到有可用元素
System.out.println("Executing: " + task.getTaskName() +
" at " + System.currentTimeMillis());

// 模拟任务执行
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Consumer interrupted");
}
}
}
}

2.5 SynchronousQueue

2.5.1 基本特性

SynchronousQueue 是一个特殊的阻塞队列:

  • 零容量:不存储任何元素
  • 直接传递:每个put操作必须等待对应的take操作
  • 同步机制:生产者和消费者直接交换数据
  • 公平性:支持公平和非公平模式

2.5.2 适用场景

  • CachedThreadPool:Executors.newCachedThreadPool()的默认队列
  • 直接传递:需要立即处理的任务
  • 线程间通信:线程间直接数据交换
  • 背压控制:自然的流量控制机制

2.5.3 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueExample {

public static void main(String[] args) throws InterruptedException {
// 创建公平的SynchronousQueue
BlockingQueue<String> queue = new SynchronousQueue<>(true);

// 启动多个生产者
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new DataProducer(queue, "Producer-" + i));
producer.start();
}

// 启动多个消费者
for (int i = 1; i <= 2; i++) {
Thread consumer = new Thread(new DataConsumer(queue, "Consumer-" + i));
consumer.start();
}

// 主线程等待
Thread.sleep(10000);
System.out.println("Main thread finished");
}

static class DataProducer implements Runnable {
private final BlockingQueue<String> queue;
private final String producerName;

public DataProducer(BlockingQueue<String> queue, String producerName) {
this.queue = queue;
this.producerName = producerName;
}

@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
String data = producerName + "-Data-" + i;

System.out.println(producerName + " trying to put: " + data);

// 使用offer方法,避免无限阻塞
boolean success = queue.offer(data, 2, TimeUnit.SECONDS);

if (success) {
System.out.println(producerName + " successfully put: " + data);
} else {
System.out.println(producerName + " failed to put: " + data + " (timeout)");
}

Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(producerName + " interrupted");
}
}
}

static class DataConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String consumerName;

public DataConsumer(BlockingQueue<String> queue, String consumerName) {
this.queue = queue;
this.consumerName = consumerName;
}

@Override
public void run() {
try {
while (true) {
System.out.println(consumerName + " trying to take data...");

String data = queue.poll(3, TimeUnit.SECONDS);

if (data != null) {
System.out.println(consumerName + " received: " + data);

// 模拟处理时间
Thread.sleep(500);
} else {
System.out.println(consumerName + " timeout, no data available");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(consumerName + " interrupted");
}
}
}
}

2.6 LinkedTransferQueue

2.6.1 基本特性

LinkedTransferQueue 是基于链表的无界阻塞队列,实现了TransferQueue接口:

  • 数据结构:基于链表的无锁算法
  • 容量限制:无界队列
  • 传递模式:支持直接传递和异步传递
  • 性能特点:高并发性能,无锁实现
  • 特殊方法:transfer()方法可直接传递给等待的消费者

2.6.2 核心方法

方法 描述 阻塞行为
transfer(E e) 直接传递给消费者,如果没有消费者则阻塞 阻塞
tryTransfer(E e) 尝试直接传递,失败则返回false 非阻塞
tryTransfer(E e, long timeout, TimeUnit unit) 在指定时间内尝试传递 超时阻塞
hasWaitingConsumer() 检查是否有等待的消费者 非阻塞
getWaitingConsumerCount() 获取等待消费者数量 非阻塞

2.6.3 适用场景

  • 实时数据传递:需要立即处理的数据
  • 请求-响应模式:Web服务器处理请求
  • 事件驱动系统:事件的实时分发
  • 高性能消息传递:替代SynchronousQueue的高性能方案

2.6.4 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.TimeUnit;

public class LinkedTransferQueueExample {

public static void main(String[] args) throws InterruptedException {
TransferQueue<Message> queue = new LinkedTransferQueue<>();

// 启动消费者(先启动,模拟等待状态)
Thread consumer1 = new Thread(new MessageConsumer(queue, "Consumer-1"));
Thread consumer2 = new Thread(new MessageConsumer(queue, "Consumer-2"));

consumer1.start();
consumer2.start();

// 等待消费者启动
Thread.sleep(1000);

// 启动生产者
Thread producer = new Thread(new MessageProducer(queue, "Producer"));
producer.start();

producer.join();
Thread.sleep(3000);

consumer1.interrupt();
consumer2.interrupt();
}

static class Message {
private final String content;
private final long timestamp;
private final int priority;

public Message(String content, int priority) {
this.content = content;
this.priority = priority;
this.timestamp = System.currentTimeMillis();
}

@Override
public String toString() {
return String.format("Message{content='%s', priority=%d, timestamp=%d}",
content, priority, timestamp);
}

public String getContent() { return content; }
public int getPriority() { return priority; }
}

static class MessageProducer implements Runnable {
private final TransferQueue<Message> queue;
private final String producerName;

public MessageProducer(TransferQueue<Message> queue, String producerName) {
this.queue = queue;
this.producerName = producerName;
}

@Override
public void run() {
try {
for (int i = 1; i <= 8; i++) {
Message message = new Message("Message-" + i, i % 3 + 1);

System.out.println(producerName + " checking waiting consumers: " +
queue.getWaitingConsumerCount());

if (queue.hasWaitingConsumer()) {
// 直接传递给等待的消费者
System.out.println(producerName + " transferring: " + message.getContent());
queue.transfer(message);
System.out.println(producerName + " transferred: " + message.getContent());
} else {
// 尝试传递,如果没有消费者则放入队列
boolean transferred = queue.tryTransfer(message, 500, TimeUnit.MILLISECONDS);
if (transferred) {
System.out.println(producerName + " try-transferred: " + message.getContent());
} else {
queue.put(message);
System.out.println(producerName + " queued: " + message.getContent());
}
}

Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(producerName + " interrupted");
}
}
}

static class MessageConsumer implements Runnable {
private final TransferQueue<Message> queue;
private final String consumerName;

public MessageConsumer(TransferQueue<Message> queue, String consumerName) {
this.queue = queue;
this.consumerName = consumerName;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
System.out.println(consumerName + " waiting for message...");

Message message = queue.poll(2, TimeUnit.SECONDS);

if (message != null) {
System.out.println(consumerName + " received: " + message);

// 模拟处理时间(根据优先级)
Thread.sleep(message.getPriority() * 300);

System.out.println(consumerName + " processed: " + message.getContent());
} else {
System.out.println(consumerName + " timeout, no message");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(consumerName + " interrupted");
}
}
}
}

2.7 LinkedBlockingDeque

2.7.1 基本特性

LinkedBlockingDeque 是基于链表的双端阻塞队列:

  • 数据结构:双向链表
  • 容量限制:可选择有界或无界
  • 双端操作:支持从头部和尾部插入/移除
  • 线程安全:使用ReentrantLock保证线程安全
  • 阻塞特性:支持双端的阻塞操作

2.7.2 核心方法

操作类型 头部操作 尾部操作 说明
插入 addFirst(), offerFirst(), putFirst() addLast(), offerLast(), putLast() 从两端插入
移除 removeFirst(), pollFirst(), takeFirst() removeLast(), pollLast(), takeLast() 从两端移除
检查 getFirst(), peekFirst() getLast(), peekLast() 检查两端元素

2.7.3 适用场景

  • 工作窃取算法:ForkJoinPool的任务队列
  • 双端缓存:LRU缓存实现
  • 撤销操作:支持撤销的操作队列
  • 双向数据流:需要双向处理的数据

2.7.4 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;

public class LinkedBlockingDequeExample {

public static void main(String[] args) throws InterruptedException {
// 创建容量为10的双端队列
BlockingDeque<Task> deque = new LinkedBlockingDeque<>(10);

// 启动工作线程(从头部获取高优先级任务)
Thread worker1 = new Thread(new Worker(deque, "Worker-1", true));
Thread worker2 = new Thread(new Worker(deque, "Worker-2", false));

// 启动任务生产者
Thread producer = new Thread(new TaskProducer(deque));

worker1.start();
worker2.start();
producer.start();

producer.join();
Thread.sleep(5000);

worker1.interrupt();
worker2.interrupt();
}

static class Task {
private final String taskId;
private final int priority;
private final String description;

public Task(String taskId, int priority, String description) {
this.taskId = taskId;
this.priority = priority;
this.description = description;
}

@Override
public String toString() {
return String.format("Task{id='%s', priority=%d, desc='%s'}",
taskId, priority, description);
}

public int getPriority() { return priority; }
public String getTaskId() { return taskId; }
}

static class TaskProducer implements Runnable {
private final BlockingDeque<Task> deque;

public TaskProducer(BlockingDeque<Task> deque) {
this.deque = deque;
}

@Override
public void run() {
try {
// 添加不同优先级的任务
for (int i = 1; i <= 12; i++) {
int priority = (i % 3) + 1;
Task task = new Task("T" + i, priority, "Task " + i + " description");

if (priority == 1) {
// 高优先级任务放在头部
deque.putFirst(task);
System.out.println("Added high priority task to front: " + task.getTaskId());
} else {
// 普通任务放在尾部
deque.putLast(task);
System.out.println("Added normal task to rear: " + task.getTaskId());
}

Thread.sleep(300);
}

System.out.println("All tasks produced. Queue size: " + deque.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Producer interrupted");
}
}
}

static class Worker implements Runnable {
private final BlockingDeque<Task> deque;
private final String workerName;
private final boolean preferHighPriority;

public Worker(BlockingDeque<Task> deque, String workerName, boolean preferHighPriority) {
this.deque = deque;
this.workerName = workerName;
this.preferHighPriority = preferHighPriority;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task;

if (preferHighPriority) {
// 优先从头部获取(高优先级任务)
task = deque.pollFirst(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println(workerName + " took from FRONT: " + task);
}
} else {
// 从尾部获取(LIFO方式,类似栈)
task = deque.pollLast(1, TimeUnit.SECONDS);
if (task != null) {
System.out.println(workerName + " took from REAR: " + task);
}
}

if (task != null) {
// 模拟任务处理时间
Thread.sleep(task.getPriority() * 200);
System.out.println(workerName + " completed: " + task.getTaskId());
} else {
System.out.println(workerName + " timeout, no task available");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(workerName + " interrupted");
}
}
}
}

3. 阻塞队列性能对比

3.1 性能特性对比

队列类型 数据结构 容量 锁机制 并发性能 内存使用 适用场景
ArrayBlockingQueue 数组 有界 单锁 中等 预分配 固定容量
LinkedBlockingQueue 链表 可选 双锁 动态 高并发
PriorityBlockingQueue 无界 单锁 中等 动态 优先级
DelayQueue 无界 单锁 中等 动态 延迟处理
SynchronousQueue 0 CAS 最小 直接传递
LinkedTransferQueue 链表 无界 无锁 最高 动态 高性能
LinkedBlockingDeque 双向链表 可选 单锁 中高 动态 双端操作

3.2 吞吐量测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueuePerformanceTest {

private static final int PRODUCER_COUNT = 4;
private static final int CONSUMER_COUNT = 4;
private static final int MESSAGES_PER_PRODUCER = 100000;
private static final int QUEUE_CAPACITY = 1000;

public static void main(String[] args) throws InterruptedException {
System.out.println("阻塞队列性能测试\n");

testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(QUEUE_CAPACITY));
testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>(QUEUE_CAPACITY));
testQueue("SynchronousQueue", new SynchronousQueue<>());
testQueue("LinkedTransferQueue", new LinkedTransferQueue<>());
}

private static void testQueue(String queueName, BlockingQueue<Integer> queue)
throws InterruptedException {

System.out.println("测试队列: " + queueName);

AtomicLong producedCount = new AtomicLong(0);
AtomicLong consumedCount = new AtomicLong(0);

long startTime = System.currentTimeMillis();

// 启动生产者
Thread[] producers = new Thread[PRODUCER_COUNT];
for (int i = 0; i < PRODUCER_COUNT; i++) {
producers[i] = new Thread(new Producer(queue, producedCount));
producers[i].start();
}

// 启动消费者
Thread[] consumers = new Thread[CONSUMER_COUNT];
for (int i = 0; i < CONSUMER_COUNT; i++) {
consumers[i] = new Thread(new Consumer(queue, consumedCount));
consumers[i].start();
}

// 等待生产者完成
for (Thread producer : producers) {
producer.join();
}

// 等待消费完成
while (consumedCount.get() < PRODUCER_COUNT * MESSAGES_PER_PRODUCER) {
Thread.sleep(10);
}

// 停止消费者
for (Thread consumer : consumers) {
consumer.interrupt();
}

long endTime = System.currentTimeMillis();
long duration = endTime - startTime;

long totalMessages = PRODUCER_COUNT * MESSAGES_PER_PRODUCER;
double throughput = (double) totalMessages / duration * 1000;

System.out.printf(" 总消息数: %d\n", totalMessages);
System.out.printf(" 执行时间: %d ms\n", duration);
System.out.printf(" 吞吐量: %.2f messages/sec\n\n", throughput);
}

static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final AtomicLong counter;

public Producer(BlockingQueue<Integer> queue, AtomicLong counter) {
this.queue = queue;
this.counter = counter;
}

@Override
public void run() {
try {
for (int i = 0; i < MESSAGES_PER_PRODUCER; i++) {
queue.put(i);
counter.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final AtomicLong counter;

public Consumer(BlockingQueue<Integer> queue, AtomicLong counter) {
this.queue = queue;
this.counter = counter;
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Integer message = queue.take();
counter.incrementAndGet();
// 模拟少量处理时间
if (message % 10000 == 0) {
Thread.sleep(1);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

4. 最佳实践

4.1 选择合适的阻塞队列

4.1.1 决策树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
需要阻塞队列?
├─ 是否需要优先级?
│ ├─ 是 → PriorityBlockingQueue
│ └─ 否 → 继续
├─ 是否需要延迟处理?
│ ├─ 是 → DelayQueue
│ └─ 否 → 继续
├─ 是否需要双端操作?
│ ├─ 是 → LinkedBlockingDeque
│ └─ 否 → 继续
├─ 是否需要直接传递?
│ ├─ 是 → SynchronousQueue 或 LinkedTransferQueue
│ └─ 否 → 继续
├─ 容量是否固定?
│ ├─ 是 → ArrayBlockingQueue
│ └─ 否 → LinkedBlockingQueue

4.1.2 场景推荐

高并发Web服务器

1
2
3
4
5
// 使用LinkedTransferQueue处理请求
TransferQueue<HttpRequest> requestQueue = new LinkedTransferQueue<>();

// 或者使用LinkedBlockingQueue
BlockingQueue<HttpRequest> requestQueue = new LinkedBlockingQueue<>(10000);

任务调度系统

1
2
3
4
5
// 使用PriorityBlockingQueue按优先级处理
BlockingQueue<ScheduledTask> taskQueue = new PriorityBlockingQueue<>();

// 使用DelayQueue处理延迟任务
DelayQueue<DelayedTask> delayedTasks = new DelayQueue<>();

线程池配置

1
2
3
4
5
6
7
8
9
10
11
// 固定大小线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity)
);

// 缓存线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>()
);

4.2 性能优化技巧

4.2.1 批量操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BatchProcessor {
private final BlockingQueue<Task> queue;
private final int batchSize;

public BatchProcessor(BlockingQueue<Task> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
}

public void processBatch() throws InterruptedException {
List<Task> batch = new ArrayList<>(batchSize);

// 使用drainTo批量获取
int count = queue.drainTo(batch, batchSize);

if (count > 0) {
// 批量处理
processTasks(batch);
}
}

private void processTasks(List<Task> tasks) {
// 批量处理逻辑
for (Task task : tasks) {
// 处理单个任务
}
}
}

4.2.2 避免无限阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SafeProducer implements Runnable {
private final BlockingQueue<Message> queue;
private volatile boolean running = true;

@Override
public void run() {
while (running) {
try {
Message message = createMessage();

// 使用超时方法,避免无限阻塞
boolean success = queue.offer(message, 5, TimeUnit.SECONDS);

if (!success) {
// 处理队列满的情况
handleQueueFull(message);
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

public void stop() {
running = false;
}
}

4.2.3 监控队列状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class QueueMonitor {
private final BlockingQueue<?> queue;
private final ScheduledExecutorService scheduler;

public QueueMonitor(BlockingQueue<?> queue) {
this.queue = queue;
this.scheduler = Executors.newScheduledThreadPool(1);
}

public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
int size = queue.size();
int remaining = queue.remainingCapacity();

System.out.printf("Queue Status - Size: %d, Remaining: %d, Usage: %.2f%%\n",
size, remaining, (double) size / (size + remaining) * 100);

// 队列使用率过高时告警
if (size > (size + remaining) * 0.8) {
System.out.println("WARNING: Queue usage is high!");
}

}, 0, 10, TimeUnit.SECONDS);
}
}

4.3 常见陷阱和解决方案

4.3.1 内存泄漏

问题:无界队列可能导致内存溢出

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 使用有界队列
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(10000);

// 或者实现背压机制
public class BackpressureProducer {
private final BlockingQueue<Task> queue;
private final AtomicInteger pendingTasks = new AtomicInteger(0);
private final int maxPendingTasks;

public boolean submitTask(Task task) {
if (pendingTasks.get() >= maxPendingTasks) {
return false; // 拒绝新任务
}

boolean success = queue.offer(task);
if (success) {
pendingTasks.incrementAndGet();
}
return success;
}
}

4.3.2 线程中断处理

问题:不正确的中断处理

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ProperInterruptHandling implements Runnable {
private final BlockingQueue<Task> queue;

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = queue.take();
processTask(task);
}
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();

// 清理资源
cleanup();
}
}

private void processTask(Task task) {
// 处理任务
}

private void cleanup() {
// 清理资源
}
}

4.3.3 死锁预防

问题:多个队列操作可能导致死锁

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DeadlockFreeProcessor {
private final BlockingQueue<Task> inputQueue;
private final BlockingQueue<Result> outputQueue;

public void processWithTimeout() {
try {
// 使用超时方法避免死锁
Task task = inputQueue.poll(1, TimeUnit.SECONDS);
if (task != null) {
Result result = process(task);

boolean success = outputQueue.offer(result, 1, TimeUnit.SECONDS);
if (!success) {
// 处理输出队列满的情况
handleOutputQueueFull(result);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

4.4 总结

阻塞队列是Java并发编程中的重要工具,正确选择和使用阻塞队列可以:

  1. 简化并发编程:自动处理线程同步
  2. 提高系统性能:减少线程间的竞争
  3. 增强系统稳定性:提供流量控制机制
  4. 改善代码可维护性:清晰的生产者-消费者模式

选择建议

  • 高并发场景:LinkedTransferQueue > LinkedBlockingQueue > ArrayBlockingQueue
  • 内存敏感场景:ArrayBlockingQueue > LinkedBlockingQueue
  • 特殊需求场景:根据具体需求选择PriorityBlockingQueue、DelayQueue等
  • 直接传递场景:SynchronousQueue 或 LinkedTransferQueue

通过合理选择和正确使用阻塞队列,可以构建高效、稳定的并发应用程序。


Java并发 - 阻塞队列详解
http://example.com/2025/12/01/Java并发-阻塞队列详解/
作者
TuBoShu
发布于
2025年12月1日
许可协议