Java wait和notifyAll实现简单的阻塞队列

  

让我来为你详细讲解如何使用Java的wait和notifyAll实现简单的阻塞队列。

什么是阻塞队列

阻塞队列是一种特殊的队列,与普通队列的区别在于,当队列满时,往队列中添加元素的操作会被阻塞,直到队列不满;而当队列为空时,从队列中取出元素的操作会被阻塞,直到队列不为空。

阻塞队列在多线程环境下使用更加安全,它可以帮助我们解决线程同步和协作的问题。

使用wait和notifyAll实现阻塞队列

使用Java的wait和notifyAll功能,我们可以比较方便地实现一个阻塞队列。具体实现过程如下:

  1. 定义一个阻塞队列类,该类中包含一个固定大小的数组用于存储元素。
  2. 定义一个计数器变量,用于记录队列中元素的个数。
  3. 定义两个锁对象(生产者锁和消费者锁),以及两个条件变量(队列不满和队列不空)。
  4. 实现阻塞队列的入队方法put和出队方法take。入队方法put将元素添加到队列中,若队列已满,则阻塞等待;出队方法take将队列中的元素取出,并将队列中的元素个数减一,若队列为空,则阻塞等待。

下面是基于上述实现思路的Java代码示例:

public class BlockingQueue<T> {
    private T[] queue;
    private int count;
    private int size;

    private Object putLock = new Object();
    private Object takeLock = new Object();

    private Condition notFull = putLock.newCondition();
    private Condition notEmpty = takeLock.newCondition();

    public BlockingQueue(int size) {
        queue = (T[]) new Object[size];
        this.size = size;
    }

    public void put(T element) {
        synchronized (putLock) {
            while (count >= size) {
                try {
                    notFull.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue[count++] = element;
            notEmpty.signal();
        }
    }

    public T take() {
        T element = null;
        synchronized (takeLock) {
            while (count == 0) {
                try {
                    notEmpty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            element = queue[--count];
            queue[count] = null;
            notFull.signal();
        }
        return element;
    }
}

示例一:线程池中的任务队列实现

下面是一个示例,该示例演示如何在线程池中使用阻塞队列来实现任务队列的功能。该线程池实现 consist of 10 workers 和 一个阻塞队列,队列大小为5,当队列满时,任务生产者的添加操作会被阻塞。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        BlockingQueue<Runnable> taskQueue = new BlockingQueue<>(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(new TaskProducer(taskQueue));
        }
        for (int i = 0; i < 5; i++) {
            executorService.execute(new TaskConsumer(taskQueue));
        }
    }

    static class TaskProducer implements Runnable {
        private BlockingQueue<Runnable> taskQueue;

        public TaskProducer(BlockingQueue<Runnable> taskQueue) {
            this.taskQueue = taskQueue;
        }

        @Override
        public void run() {
            while (true) {
                taskQueue.put(() -> System.out.println(Thread.currentThread().getName() + " is running."));
            }
        }
    }

    static class TaskConsumer implements Runnable {
        private BlockingQueue<Runnable> taskQueue;

        public TaskConsumer(BlockingQueue<Runnable> taskQueue) {
            this.taskQueue = taskQueue;
        }

        @Override
        public void run() {
            while (true) {
                Runnable task = taskQueue.take();
                task.run();
            }
        }
    }
}

示例二:生产者消费者问题实现

下面是一个示例,该示例演示如何使用阻塞队列来实现经典的生产者消费者问题。这个例子中,我们定义了一个存放字符的阻塞队列,由一个生产者线程不断往队列中添加字符,由多个消费者线程从队列中取出字符并打印。

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        BlockingQueue<Character> blockingQueue = new BlockingQueue<>(5);
        Thread producerThread = new Thread(() -> {
            String str = "hello world";
            int len = str.length();
            for (int i = 0; i < len; i++) {
                blockingQueue.put(str.charAt(i));
            }
        });
        producerThread.start();
        Thread[] consumerThreads = new Thread[3];
        for (int i = 0; i < 3; i++) {
            Thread consumerThread = new Thread(() -> {
                while (true) {
                    Character c = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + ": " + c);
                }
            });
            consumerThreads[i] = consumerThread;
            consumerThread.start();
        }
        try {
            producerThread.join();
            for (int i = 0; i < 3; i++) {
                consumerThreads[i].interrupt();
                consumerThreads[i].join();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

以上就是使用Java的wait和notifyAll实现简单的阻塞队列的完整攻略,希望对你有所帮助!

相关文章