阻塞队列.
目录
♫什么是阻塞队列
♫什么是生产-消费者模式
♫实现一个阻塞队列
♫BlockingQueue
♫什么是阻塞队列
阻塞队列是一种特殊的队列,它除了具备队列的先进先出的特点外,还具有以下特点:
♩.如果队列为空时,执行出队列操作,会阻塞等待,直到另一个线程往队列里添加元素(队列不为空)
♩.如果队列满了时,执行入队列操作,会阻塞等待,直到另一个线程取出队列里的元素(队列没有满)
阻塞队列常用于实现生产者-消费者模式,通过控制队列的阻塞,使得生产者和消费者的协作更加有效和高效。下面我们先简单认识下生产者-消费者模式~
♫什么是生产-消费者模式
生产者消费者模式是一种常见的并发编程模式。该模式主要针对一个共享的缓冲区,多个生产者可以向缓冲区中添加数据,多个消费者可以从缓冲区中获取数据。在该模式中,生产者向缓冲区中添加数据,如果缓冲区已满,则生产者需要等待消费者消费完一部分数据后再向缓冲区添加数据;消费者从缓冲区中获取数据,如果缓冲区为空,则消费者需要等待生产者生产数据后再从缓冲区中获取数据。生产者和消费者之间通过缓冲区进行通信和协调,而缓冲区就可以由阻塞队列构成。
一个典型的生产者-消费者模式的例子是吃酒席的时候:后厨(生产者)端菜到餐桌(阻塞队列)上,宾客(消费者)从餐桌(阻塞队列)上取走事物食用,当餐桌放满了食物,后厨就要等宾客吃完一碟再上,当餐桌上没有食物,宾客就得等后厨上菜后再吃。
使用生产者-消费者模式有以下两点好处:
♩.有效降低生产者和消费者之间的耦合:生产者和消费者之间通过阻塞队列进行交互,彼此之间没有直接关联
♩.平衡生产者和消费者之间的速度差异:由于阻塞队列的特性,平衡了两者的速度差异,保证了系统的稳定性
♫实现一个阻塞队列
我们先通过数组,实现一个普通的循环队列:
public class MyBlockingQueue {private int[] items;private int head;private int tail;private int size;public MyBlockingQueue() {items = new int[1000];head = 0;tail = 0;//队列中元素的有效个数size = 0;}//入队列public void put(int value) {//队列满了if (size == items.length) {return;}//队列没满items[tail] = value;tail = (tail + 1) % items.length;size++;}//出队列public int take() {//队列为空if (size == 0) {return -1;}//队列不为空int value = items[head];head = (head + 1) % items.length;size--;return value;} }
再对队列填加阻塞功能,并保证线程安全:
public class MyBlockingQueue {private int[] items;private int head;private int tail;private volatile int size;public MyBlockingQueue() {items = new int[1000];head = 0;tail = 0;//队列中元素的有效个数size = 0;}//入队列public void put(int value) throws InterruptedException {synchronized (this) {//队列满了// 此处最好使用 while,否则 notifyAll 的时候, 该线程从 wait 中被唤醒// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了,就只能继续等待while (size == items.length) {wait();}//队列没满items[tail] = value;tail = (tail + 1) % items.length;size++;notifyAll();}}//出队列public int take() throws InterruptedException {int value = 0;synchronized (this) {//队列为空// 此处最好使用 while,否则 notifyAll 的时候, 该线程从 wait 中被唤醒// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了,就只能继续等待while (size == 0) {wait();}//队列不为空value = items[head];head = (head + 1) % items.length;size--;notifyAll();}return value;} }
♫BlockingQueue
♩.BlockingQueue是concurrent包下的一个接口,是Java标准库中内置的阻塞队列。
♩.BlockingQueue除了保留Queue的offer、poll、peek方法外,还提供了带有阻塞功能的入队列方法:put()和带有阻塞功能的出队列方法:take()。
♩.BlockingQueue的实现类有:
①.LinkedBlockingQueue:基于链表实现的阻塞队列。
②.PriorityBlockingQueue:基于优先级队列(堆)实现的阻塞队列。③.ArrayBlockingQueue:基于数组实现的阻塞队列。
知道了BlockingQueue的基本信息,接下来我们来使用BlockingQueue实现一个生产者-消费者模式:
class Test {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();Thread customer = new Thread(()->{while (true) {try {int value = blockingQueue.take();System.out.println("消费者:" + value);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread producer = new Thread(()->{Random random = new Random();while (true) {try {int value = random.nextInt(1000);blockingQueue.put(value);System.out.println("生产者:" + value);Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});customer.start();producer.start();customer.join();producer.join();} }
由于在producer线程中sleep(100),所以producer线程入队列会很慢,producer线程一入队列就被customer线程取出:
注:由于两个线程的输出语句是并行执行,故先打印生产者还是消费者是不确定的