BolckingQueue
队列
队列的特点先进先出(FIFO)。
如图: 进入队列的顺序是1,2,3,那么出队列的顺序只能是1,2,3,不可能是其他顺序,这是由队列的特点保证的。
保存数据的基本数据结构有数组和链表,基于此,实现队列分为数组队列(ArrayQueue)和链表队列(LinkedQueue)。
BolckingQueue分类
从名称可知 Blocking Queue 阻塞队列,具体可分为数组阻塞队列和链表阻塞队列。
ArrayBlockingQueue源码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//存储元素的数组final Object[] items;//锁 因此是多线程安全final ReentrantLock lock;//条件队列private final Condition notEmpty;private final Condition notFull;...
}
从ArrayBlockingQueue 源码中不难看出,使用Object数组存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。
LinkedBlockingQueue源码:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//节点定义static class Node<E> {E item;//只有尾指针 --> 单线链表Node<E> next;Node(E x) { item = x; }}//指向头节点transient Node<E> head;//尾节点private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();...
}
从LinkedBlockingQueue源码中不难看出,使用单项链表存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。
BolckingQueue 常用方法
首先初始化一个容量为5的队列:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
1、add(E e);
函数说明:向队列中添加一个元素,当队列为空时抛NPE异常,队列满时,抛IllegalStateException异常。
返回true 表示添加元素成功,其他返回值均不成功;
该方法不会阻塞当前线程往下执行!
for (int i = 0; i < 8; i++) {try {System.out.println(blockingQueue.add("aaa"));} catch (Exception e) {}System.out.println("继续执行....");}
结果:
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
继续执行… //这里此时队列元素已满,会抛出异常,由于对异常进行了处理,因此可以继续往下执行
继续执行…
继续执行…
add(E e)底层调用offer(E e)。
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
2、offer(E e)
向队列中添加元素, 若队列为空,抛NPE异常;
true:添加元素成功;
false: 添加元素失败
不会阻塞当前线程执行
for (int i = 0; i < 8; i++) {System.out.println(blockingQueue.offer("aaa"));System.out.println("插入操作 继续执行...");
}
结果:
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
false ------------------------------------------队列满后,插入失败
插入操作 继续执行…
false
插入操作 继续执行…
false
插入操作 继续执行…
public boolean offer(E e) {//元素非空判断checkNotNull(e);final ReentrantLock lock = this.lock;//获取互斥锁 因此是线程安全的lock.lock();try {//队列已满if (count == items.length)return false;else {//插入队列enqueue(e);return true;}} finally {//释放锁lock.unlock();}}//插入队列private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;//元素插入成功后,唤醒条件队列中的线程! ********notEmpty.signal();}
2、offer(E e, long timeout, TimeUnit unit)
在指定时间内成功插入元素返回true, 失败返回false;
与offer(E e)的不同点在于,offer(E e)只插入一次,成功或者失败立即返回;
offer(E e, long timeout, TimeUnit unit) 先判断队列是否已满,若已满,则等待一定时间后再尝试插入元素。
不会阻塞当前线程执行
for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.offer("aaa",3, TimeUnit.SECONDS));System.out.println("继续执行...");}
结果:
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {/*** 如果此时队列已满* 等待一段时间再操作*/while (count == items.length) {if (nanos <= 0)return false;//处理等待时间的操作nanos = notFull.awaitNanos(nanos);}//元素插入到队列中enqueue(e);return true;} finally {lock.unlock();}}
3、put(E e)
向队列中放入元素;
会阻塞当前线程执行
for (int i = 0; i < 10; i++) {blockingQueue.put("aaa");System.out.println("继续执行...");
}
结果:
继续执行…
继续执行…
继续执行…
继续执行…
继续执行…
程序在此卡住不再执行…
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {/*** 如果此时队列已满,当前线程加入到条件队列中(进行阻塞)*/while (count == items.length)notFull.await();//当前队列未满,元素进队enqueue(e);} finally {lock.unlock();}
}
4、take()
获取队首元素,若元素为空,
则阻塞等待
for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.take());System.out.println("获取操作 继续执行...");
}
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空while (count == 0)//阻塞等待notEmpty.await();return dequeue();} finally {lock.unlock();}}
5、poll()
获取队首元素,如果此时队列为空,返回null;
不会阻塞当前线程执行!
for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.poll());System.out.println("获取操作 继续执行...");
}
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}
}
6、poll(long timeout, TimeUnit unit)
同poll(); 只不过当队列为空时,等待一定时间再获取队首元素
for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));System.out.println("获取操作 继续执行...");}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}//获取队首元素return dequeue();} finally {lock.unlock();}
}
7、drainTo(Collection<? super E> c);
获取队列中的全部元素,保存到指定的集合中,结果返回元素个数。
不会阻塞当前线程执行
List<String> list = new ArrayList<>();int i = blockingQueue.drainTo(list);System.out.println(i);list.stream().forEach(str -> {System.out.println(str);});
8、drainTo(Collection<? super E> c, int maxElements);
从队列中获取指定数量的元素,保存在给定的集合中
List<String> list = new ArrayList<>();int i = blockingQueue.drainTo(list, 3);System.out.println(i);list.stream().forEach(str -> {System.out.println(str);});
OK,对上述所有方法做个总结可如下图所示:
存/取 搭配使用。
使用BolckingQueue实现生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class ProducerConsumerExample {private static final int QUEUE_CAPACITY = 5; // 队列容量private static final int TOTAL_ITEMS = 10; // 生产和消费的总数量public static void main(String[] args) {// 创建一个大小为 QUEUE_CAPACITY 的阻塞队列BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);// 创建生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < TOTAL_ITEMS; i++) {String item = "Item " + i;queue.put(item); // 如果队列已满,生产者线程会被阻塞System.out.println("Produced: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Producer interrupted");}});// 创建消费者线程Thread consumer = new Thread(() -> {try {for (int i = 0; i < TOTAL_ITEMS; i++) {String item = queue.take(); // 如果队列为空,消费者线程会被阻塞System.out.println("Consumed: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Consumer interrupted");}});// 启动生产者和消费者线程producer.start();consumer.start();// 等待线程结束try {producer.join();consumer.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Main thread interrupted");}System.out.println("Production and consumption completed.");}
}
ArrayBlockingQueue是在哪一步唤醒等待条件的线程的 ?
唤醒生产者线程:
//获取队头元素
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//唤醒生产者线程 ****************notFull.signal();return x;}
生产者放入元素后会唤醒消费者
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;//唤醒消费者notEmpty.signal();}