当前位置: 首页 > news >正文

BolckingQueue

队列

队列的特点先进先出(FIFO)。
如图: 进入队列的顺序是1,2,3,那么出队列的顺序只能是1,2,3,不可能是其他顺序,这是由队列的特点保证的。
在这里插入图片描述
保存数据的基本数据结构有数组链表,基于此,实现队列分为数组队列(ArrayQueue)和链表队列(LinkedQueue)。

BolckingQueue分类

从名称可知 Blocking Queue 阻塞队列,具体可分为数组阻塞队列和链表阻塞队列。
在这里插入图片描述
ArrayBlockingQueue源码:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//存储元素的数组final Object[] items;//锁  因此是多线程安全final ReentrantLock lock;//条件队列private final Condition notEmpty;private final Condition notFull;...
}

从ArrayBlockingQueue 源码中不难看出,使用Object数组存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。

LinkedBlockingQueue源码:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//节点定义static class Node<E> {E item;//只有尾指针 --> 单线链表Node<E> next;Node(E x) { item = x; }}//指向头节点transient Node<E> head;//尾节点private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();...
}

从LinkedBlockingQueue源码中不难看出,使用单项链表存储元素,使用ReentrantLock 保证多线程操作安全,
阻塞功能通过“条件锁”Condition 实现。

BolckingQueue 常用方法

首先初始化一个容量为5的队列:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
1、add(E e);

函数说明:向队列中添加一个元素,当队列为空时抛NPE异常,队列满时,抛IllegalStateException异常。
返回true 表示添加元素成功,其他返回值均不成功;
该方法不会阻塞当前线程往下执行!

 for (int i = 0; i < 8; i++) {try {System.out.println(blockingQueue.add("aaa"));} catch (Exception e) {}System.out.println("继续执行....");}

结果:

true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
继续执行… //这里此时队列元素已满,会抛出异常,由于对异常进行了处理,因此可以继续往下执行
继续执行…
继续执行…

add(E e)底层调用offer(E e)。

public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
2、offer(E e)

向队列中添加元素, 若队列为空,抛NPE异常;
true:添加元素成功;
false: 添加元素失败
不会阻塞当前线程执行

for (int i = 0; i < 8; i++) {System.out.println(blockingQueue.offer("aaa"));System.out.println("插入操作  继续执行...");
}

结果:

true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
true
插入操作 继续执行…
false ------------------------------------------队列满后,插入失败
插入操作 继续执行…
false
插入操作 继续执行…
false
插入操作 继续执行…

 public boolean offer(E e) {//元素非空判断checkNotNull(e);final ReentrantLock lock = this.lock;//获取互斥锁  因此是线程安全的lock.lock();try {//队列已满if (count == items.length)return false;else {//插入队列enqueue(e);return true;}} finally {//释放锁lock.unlock();}}//插入队列private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;//元素插入成功后,唤醒条件队列中的线程! ********notEmpty.signal();}
2、offer(E e, long timeout, TimeUnit unit)

在指定时间内成功插入元素返回true, 失败返回false;
与offer(E e)的不同点在于,offer(E e)只插入一次,成功或者失败立即返回;
offer(E e, long timeout, TimeUnit unit) 先判断队列是否已满,若已满,则等待一定时间后再尝试插入元素。
不会阻塞当前线程执行

for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.offer("aaa",3, TimeUnit.SECONDS));System.out.println("继续执行...");}

结果:

true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
true
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…
false
继续执行…

public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {/*** 如果此时队列已满* 等待一段时间再操作*/while (count == items.length) {if (nanos <= 0)return false;//处理等待时间的操作nanos = notFull.awaitNanos(nanos);}//元素插入到队列中enqueue(e);return true;} finally {lock.unlock();}}
3、put(E e)

向队列中放入元素;
会阻塞当前线程执行


for (int i = 0; i < 10; i++) {blockingQueue.put("aaa");System.out.println("继续执行...");
}

结果:

继续执行…
继续执行…
继续执行…
继续执行…
继续执行…
程序在此卡住不再执行…

public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {/*** 如果此时队列已满,当前线程加入到条件队列中(进行阻塞)*/while (count == items.length)notFull.await();//当前队列未满,元素进队enqueue(e);} finally {lock.unlock();}
}

4、take()

获取队首元素,若元素为空,
则阻塞等待

for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.take());System.out.println("获取操作  继续执行...");
}
 public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空while (count == 0)//阻塞等待notEmpty.await();return dequeue();} finally {lock.unlock();}}
5、poll()

获取队首元素,如果此时队列为空,返回null;
不会阻塞当前线程执行!

for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.poll());System.out.println("获取操作  继续执行...");
}
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}
}
6、poll(long timeout, TimeUnit unit)

同poll(); 只不过当队列为空时,等待一定时间再获取队首元素

 for (int i = 0; i < 10; i++) {System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));System.out.println("获取操作  继续执行...");}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列为空while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}//获取队首元素return dequeue();} finally {lock.unlock();}
}
7、drainTo(Collection<? super E> c);

获取队列中的全部元素,保存到指定的集合中,结果返回元素个数。
不会阻塞当前线程执行

  List<String> list = new ArrayList<>();int i = blockingQueue.drainTo(list);System.out.println(i);list.stream().forEach(str -> {System.out.println(str);});
8、drainTo(Collection<? super E> c, int maxElements);

从队列中获取指定数量的元素,保存在给定的集合中

 List<String> list = new ArrayList<>();int i = blockingQueue.drainTo(list, 3);System.out.println(i);list.stream().forEach(str -> {System.out.println(str);});

OK,对上述所有方法做个总结可如下图所示:
在这里插入图片描述
存/取 搭配使用。

使用BolckingQueue实现生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class ProducerConsumerExample {private static final int QUEUE_CAPACITY = 5; // 队列容量private static final int TOTAL_ITEMS = 10; // 生产和消费的总数量public static void main(String[] args) {// 创建一个大小为 QUEUE_CAPACITY 的阻塞队列BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);// 创建生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < TOTAL_ITEMS; i++) {String item = "Item " + i;queue.put(item); // 如果队列已满,生产者线程会被阻塞System.out.println("Produced: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Producer interrupted");}});// 创建消费者线程Thread consumer = new Thread(() -> {try {for (int i = 0; i < TOTAL_ITEMS; i++) {String item = queue.take(); // 如果队列为空,消费者线程会被阻塞System.out.println("Consumed: " + item);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Consumer interrupted");}});// 启动生产者和消费者线程producer.start();consumer.start();// 等待线程结束try {producer.join();consumer.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("Main thread interrupted");}System.out.println("Production and consumption completed.");}
}

在这里插入图片描述

ArrayBlockingQueue是在哪一步唤醒等待条件的线程的 ?
唤醒生产者线程:

//获取队头元素
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//唤醒生产者线程 ****************notFull.signal();return x;}

生产者放入元素后会唤醒消费者

    private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;//唤醒消费者notEmpty.signal();}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • AI嘴替:黑神话悟空
  • SSHamble:一款针对SSH技术安全的研究与分析工具
  • 【Elasticsearch系列六】系统命令API
  • 安卓玩机工具-----无需root权限 卸载 禁用 删除当前机型app应用 ADB玩机工具
  • STM32与ESP8266的使用
  • JavaDS —— B树
  • 什么是json?json可以存放在哪几种数据类型?在什么时候用?
  • 桂花网发布Cassia M2000:重塑物联网格局的紧凑型蜂窝蓝牙网关
  • 王者荣耀改重复名(java源码)
  • 【例题】lanqiao3226 宝藏排序Ⅱ
  • jacoco生成单元测试覆盖率报告
  • Nginx的使用场景:构建高效、可扩展的Web架构
  • 数据管理生态的核心解析:数据库、数据仓库、数据湖、数据平台与数据中台的关系与实现
  • 【C++】缺省(默认)参数
  • SpringBoot 图书管理系统
  • 分享的文章《人生如棋》
  • 【162天】黑马程序员27天视频学习笔记【Day02-上】
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • AWS实战 - 利用IAM对S3做访问控制
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • jdbc就是这么简单
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • Three.js 再探 - 写一个跳一跳极简版游戏
  • 从输入URL到页面加载发生了什么
  • 动手做个聊天室,前端工程师百无聊赖的人生
  • 汉诺塔算法
  • 聊聊sentinel的DegradeSlot
  • 如何设计一个微型分布式架构?
  • 微信小程序:实现悬浮返回和分享按钮
  • 新手搭建网站的主要流程
  • 在electron中实现跨域请求,无需更改服务器端设置
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • elasticsearch-head插件安装
  • zabbix3.2监控linux磁盘IO
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ​总结MySQL 的一些知识点:MySQL 选择数据库​
  • #### golang中【堆】的使用及底层 ####
  • (003)SlickEdit Unity的补全
  • (02)Cartographer源码无死角解析-(03) 新数据运行与地图保存、加载地图启动仅定位模式
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (四)模仿学习-完成后台管理页面查询
  • (算法)硬币问题
  • (文章复现)基于主从博弈的售电商多元零售套餐设计与多级市场购电策略
  • (学习日记)2024.04.10:UCOSIII第三十八节:事件实验
  • (一)VirtualBox安装增强功能
  • (已解决)报错:Could not load the Qt platform plugin “xcb“
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • .NET C#版本和.NET版本以及VS版本的对应关系
  • .NET delegate 委托 、 Event 事件,接口回调
  • .net 按比例显示图片的缩略图
  • .NET 的静态构造函数是否线程安全?答案是肯定的!
  • .Net 执行Linux下多行shell命令方法
  • .net(C#)中String.Format如何使用
  • .net程序集学习心得