Java wait和notifyAll实现简单的阻塞队列
让我来为你详细讲解如何使用Java的wait和notifyAll实现简单的阻塞队列。
什么是阻塞队列
阻塞队列是一种特殊的队列,与普通队列的区别在于,当队列满时,往队列中添加元素的操作会被阻塞,直到队列不满;而当队列为空时,从队列中取出元素的操作会被阻塞,直到队列不为空。
阻塞队列在多线程环境下使用更加安全,它可以帮助我们解决线程同步和协作的问题。
使用wait和notifyAll实现阻塞队列
使用Java的wait和notifyAll功能,我们可以比较方便地实现一个阻塞队列。具体实现过程如下:
- 定义一个阻塞队列类,该类中包含一个固定大小的数组用于存储元素。
- 定义一个计数器变量,用于记录队列中元素的个数。
- 定义两个锁对象(生产者锁和消费者锁),以及两个条件变量(队列不满和队列不空)。
- 实现阻塞队列的入队方法put和出队方法take。入队方法put将元素添加到队列中,若队列已满,则阻塞等待;出队方法take将队列中的元素取出,并将队列中的元素个数减一,若队列为空,则阻塞等待。
下面是基于上述实现思路的Java代码示例:
public class BlockingQueue<T> {
private T[] queue;
private int count;
private int size;
private Object putLock = new Object();
private Object takeLock = new Object();
private Condition notFull = putLock.newCondition();
private Condition notEmpty = takeLock.newCondition();
public BlockingQueue(int size) {
queue = (T[]) new Object[size];
this.size = size;
}
public void put(T element) {
synchronized (putLock) {
while (count >= size) {
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue[count++] = element;
notEmpty.signal();
}
}
public T take() {
T element = null;
synchronized (takeLock) {
while (count == 0) {
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
element = queue[--count];
queue[count] = null;
notFull.signal();
}
return element;
}
}
示例一:线程池中的任务队列实现
下面是一个示例,该示例演示如何在线程池中使用阻塞队列来实现任务队列的功能。该线程池实现 consist of 10 workers 和 一个阻塞队列,队列大小为5,当队列满时,任务生产者的添加操作会被阻塞。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
BlockingQueue<Runnable> taskQueue = new BlockingQueue<>(5);
for (int i = 0; i < 5; i++) {
executorService.execute(new TaskProducer(taskQueue));
}
for (int i = 0; i < 5; i++) {
executorService.execute(new TaskConsumer(taskQueue));
}
}
static class TaskProducer implements Runnable {
private BlockingQueue<Runnable> taskQueue;
public TaskProducer(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
while (true) {
taskQueue.put(() -> System.out.println(Thread.currentThread().getName() + " is running."));
}
}
}
static class TaskConsumer implements Runnable {
private BlockingQueue<Runnable> taskQueue;
public TaskConsumer(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
while (true) {
Runnable task = taskQueue.take();
task.run();
}
}
}
}
示例二:生产者消费者问题实现
下面是一个示例,该示例演示如何使用阻塞队列来实现经典的生产者消费者问题。这个例子中,我们定义了一个存放字符的阻塞队列,由一个生产者线程不断往队列中添加字符,由多个消费者线程从队列中取出字符并打印。
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Character> blockingQueue = new BlockingQueue<>(5);
Thread producerThread = new Thread(() -> {
String str = "hello world";
int len = str.length();
for (int i = 0; i < len; i++) {
blockingQueue.put(str.charAt(i));
}
});
producerThread.start();
Thread[] consumerThreads = new Thread[3];
for (int i = 0; i < 3; i++) {
Thread consumerThread = new Thread(() -> {
while (true) {
Character c = blockingQueue.take();
System.out.println(Thread.currentThread().getName() + ": " + c);
}
});
consumerThreads[i] = consumerThread;
consumerThread.start();
}
try {
producerThread.join();
for (int i = 0; i < 3; i++) {
consumerThreads[i].interrupt();
consumerThreads[i].join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以上就是使用Java的wait和notifyAll实现简单的阻塞队列的完整攻略,希望对你有所帮助!