当前位置: 首页 > news >正文

【Java to Architect】Blocking Queue 阻塞队列应对并发

概论

Blocking Queue(阻塞队列)

  1. 试图从空队列出列的线程会被阻塞,直到另一个线程将某个项目添加到队列中。
  2. 试图添加到满队列的线程会被阻塞,直到另一个线程将某个项目弹出队列。

阻塞队列是并发友好的。

阻塞队列类型(从有界无界角度)

  1. 无界阻塞队列(unbounded)
  2. 有界阻塞队列(bounded)
无界阻塞队列
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>()

无界阻塞队列的容量为`Integer.MAX_VALUE,任何添加项目的线程操作都不会被阻塞,因此,无界阻塞队列的大小可能会变得无比巨大。

生产-消费模型中,如果选择了无界队列,需要注意的是,消费者的消费操作必须很快,不然由于无界,很可能导致OOM错误。

有界阻塞队列
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

此队列的容量为10,当队列的项目数量为10时,当生产者继续添加项目,生产者的行为将被阻塞,直到有队列重新得到控件。

在设计并发程序时,使用有界阻塞队列的好处是,不需要其他操作,就可以让程序节流。

阻塞队列接口

根据最终的行为结果来分类,有两类接口:

  1. 添加项目(生产)
  2. 弹出、回收项目(消费)
添加
  • add() - 添加成功则返回 true,失败则抛出illegalStateException异常
  • put() - 添加元素,必要时会等待队列腾出空余位子
  • offer() - 添加成功返回 true,失败则返回 false
  • offer(E e, long timeout, TimeUnit unit) - 为添加行为设定一个等待时间上限,若超时则放弃添加
回收
  • take() - 若队列有可用元素则弹出该元素,若队列为空,则等待有可用元素
  • poll(long timeout, TimeUnit unit) - 为弹出元素行为设定一个时间上限,当超时时,返回 null

生产-消费模型例子

在这个例子中:

  • 生产者生产0-20的数字,放入10个位子的有界阻塞队列中
  • 消费者消费数字
  • 当消费者得到特殊数字,则消费者停止消费

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class BlockingQueueDemo {
    public static void testDemo() {
        class NumbersProducer implements Runnable {
            private BlockingQueue<Integer> numbersQueue;

            public NumbersProducer(BlockingQueue<Integer> numbersQueue) {
                this.numbersQueue = numbersQueue;
            }

            public void run() {
                try {
                    while (true) {
                        System.out.println("producer is producing...");
                        numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
                        System.out.println("produced. Now the queue is: " + numbersQueue);
                        Thread.sleep(300);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        class NumbersConsumer implements Runnable {
            private BlockingQueue<Integer> queue;
            private final int poisonPill;

            public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
                this.queue = queue;
                this.poisonPill = poisonPill;
            }

            public void run() {
                try {
                    while (true) {
                        System.out.println("Consumer start to consume...");
                        Integer number = queue.take();
                        System.out.println("Consumed. Consumed number is: " + number + ". queue contains" + queue);
                        if (number.equals(poisonPill)) {
                            System.out.println("got the poisonPill: " + poisonPill);
                            return;
                        }
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        int BOUND = 10;
        int poisonPill = ThreadLocalRandom.current().nextInt(20);

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

        ExecutorService service = Executors.newCachedThreadPool();
        NumbersProducer producer = new NumbersProducer(queue);
        NumbersConsumer consumer = new NumbersConsumer(queue, poisonPill);
        System.out.println("THE POISON IS: " + poisonPill);
        service.submit(producer);
        service.submit(consumer);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {

        }
        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueDemo.testDemo();
    }
}

运行代码之后,得到的日志如下:
producer is producing…
Consumer start to consume…
produced. Now the queue is: [63]
Consumed. Consumed number is: 63. queue contains[]
producer is producing…
produced. Now the queue is: [53]
producer is producing…
produced. Now the queue is: [53, 35]
producer is producing…
produced. Now the queue is: [53, 35, 66]
Consumer start to consume…
Consumed. Consumed number is: 53. queue contains[35, 66]
producer is producing…
produced. Now the queue is: [35, 66, 85]
producer is producing…
produced. Now the queue is: [35, 66, 85, 21]
producer is producing…
produced. Now the queue is: [35, 66, 85, 21, 14]
Consumer start to consume…
Consumed. Consumed number is: 35. queue contains[66, 85, 21, 14]
producer is producing…
produced. Now the queue is: [66, 85, 21, 14, 86]
producer is producing…
produced. Now the queue is: [66, 85, 21, 14, 86, 12]
producer is producing…
produced. Now the queue is: [66, 85, 21, 14, 86, 12, 47]
Consumer start to consume…
Consumed. Consumed number is: 66. queue contains[85, 21, 14, 86, 12, 47]
producer is producing…
produced. Now the queue is: [85, 21, 14, 86, 12, 47, 45]
producer is producing…
produced. Now the queue is: [85, 21, 14, 86, 12, 47, 45, 40]
producer is producing…
produced. Now the queue is: [85, 21, 14, 86, 12, 47, 45, 40, 90]
producer is producing…
produced. Now the queue is: [85, 21, 14, 86, 12, 47, 45, 40, 90, 34]
Consumer start to consume…
Consumed. Consumed number is: 85. queue contains[21, 14, 86, 12, 47, 45, 40, 90, 34]
producer is producing…
produced. Now the queue is: [21, 14, 86, 12, 47, 45, 40, 90, 34, 53]
producer is producing…
Consumer start to consume…
Consumed. Consumed number is: 21. queue contains[14, 86, 12, 47, 45, 40, 90, 34, 53]
produced. Now the queue is: [14, 86, 12, 47, 45, 40, 90, 34, 53, 80]
Consumer start to consume…
Consumed. Consumed number is: 14. queue contains[86, 12, 47, 45, 40, 90, 34, 53, 80, 37]
produced. Now the queue is: [86, 12, 47, 45, 40, 90, 34, 53, 80, 37]
producer is producing…
Consumer start to consume…
Consumed. Consumed number is: 86. queue contains[12, 47, 45, 40, 90, 34, 53, 80, 37]
produced. Now the queue is: [12, 47, 45, 40, 90, 34, 53, 80, 37, 73]
producer is producing…
Consumer start to consume…
Consumed. Consumed number is: 12. queue contains[47, 45, 40, 90, 34, 53, 80, 37, 73]
produced. Now the queue is: [47, 45, 40, 90, 34, 53, 80, 37, 73, 85]
producer is producing…
Consumer start to consume…
Consumed. Consumed number is: 47. queue contains[45, 40, 90, 34, 53, 80, 37, 73, 85]
produced. Now the queue is: [45, 40, 90, 34, 53, 80, 37, 73, 85, 95]
producer is producing…
PS E:\Achi> e:; cd ‘e:\Achi’; & ‘c:\Users\Administrator.vscode\extensions\vscjava.vscode-java-debug-0.30.0\scripts\launcher.bat’ ‘C:\Program Files\Java\jdk-11\bin\java.exe’ ‘-Dfile.encoding=UTF-8’ ‘@C:\Users\ADMINI~1\AppData\Local\Temp\cp_7l3ntbr8klh7dvqwh3tsamibj.argfile’ ‘blockingQueue.BlockingQueueDemo’
THE POISON IS: 10
producer is producing…
Consumer start to consume…
produced. Now the queue is: []
Consumed. Consumed number is: 57. queue contains[]
producer is producing…
produced. Now the queue is: [72]
producer is producing…
produced. Now the queue is: [72, 89]
producer is producing…
produced. Now the queue is: [72, 89, 10]
Consumer start to consume…
Consumed. Consumed number is: 72. queue contains[89, 10]
producer is producing…
produced. Now the queue is: [89, 10, 91]
producer is producing…
produced. Now the queue is: [89, 10, 91, 41]
producer is producing…
produced. Now the queue is: [89, 10, 91, 41, 9]
Consumer start to consume…
Consumed. Consumed number is: 89. queue contains[10, 91, 41, 9]
producer is producing…
produced. Now the queue is: [10, 91, 41, 9, 40]
producer is producing…
produced. Now the queue is: [10, 91, 41, 9, 40, 12]
producer is producing…
produced. Now the queue is: [10, 91, 41, 9, 40, 12, 35]
Consumer start to consume…
Consumed. Consumed number is: 10. queue contains[91, 41, 9, 40, 12, 35]
got the poisonPill: 10

producer is producing…
produced. Now the queue is: [91, 41, 9, 40, 12, 35, 86]
producer is producing…
produced. Now the queue is: [91, 41, 9, 40, 12, 35, 86, 66]
producer is producing…
produced. Now the queue is: [91, 41, 9, 40, 12, 35, 86, 66, 95]
producer is producing…
produced. Now the queue is: [91, 41, 9, 40, 12, 35, 86, 66, 95, 1]
producer is producing…

相关文章:

  • 【Java to Architect】HashSet TreeSet 集合 红黑树
  • 【Salesforce】【LWC】响应式验证标准查找输入框
  • 最长递增子序列问题(LIS) 动态规划 JavaScript
  • 位屏蔽(Bitmasking)中屏蔽字赋值语句 mask | (1 << j) 的解释
  • 【Java to Architect】synchronized保证内存可见性 demo的另一种解法
  • 利用位屏蔽和动态规划解决最小代价任务分配问题 Bitmasking Dynamic Programming
  • 算法:回溯法(backtracking)解决寻找给定字符串的所有排序(permutations)问题
  • 算法: 动态规划 寻找2D矩阵中到达某一坐标的最小代价路径
  • 算法:动态规划 寻找2D矩阵中到达某一坐标的可能路径总数
  • 算法:动态规划 寻找2D矩阵中到达某一坐标的可能路径总数进阶版(添加路障)
  • 算法: 动态规划,二维矩阵代价最值进阶版 两条行进路径,一次相交,求解最大代价
  • Programming Languages And Lambda calculi 1.1 定义集合
  • 算法: 动态规划 编辑距离 Edit Distance / Levenshtein Distance
  • 【Salesforce】【Apex】Trigger中不通过soql查询记录类型的开发名称
  • 【Programming Languages And Lambda calculi】 1.2 ~ 1.3 关系、赋值与关系
  • 【402天】跃迁之路——程序员高效学习方法论探索系列(实验阶段159-2018.03.14)...
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • ESLint简单操作
  • Java 多线程编程之:notify 和 wait 用法
  • JavaScript 一些 DOM 的知识点
  • JavaScript学习总结——原型
  • Js基础知识(四) - js运行原理与机制
  • Laravel 实践之路: 数据库迁移与数据填充
  • Laravel 中的一个后期静态绑定
  • Netty 4.1 源代码学习:线程模型
  • python 学习笔记 - Queue Pipes,进程间通讯
  • React-生命周期杂记
  • Redis提升并发能力 | 从0开始构建SpringCloud微服务(2)
  • SQLServer之创建显式事务
  • vuex 学习笔记 01
  • 从零开始在ubuntu上搭建node开发环境
  • 多线程 start 和 run 方法到底有什么区别?
  • 飞驰在Mesos的涡轮引擎上
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 工作中总结前端开发流程--vue项目
  • 理解IaaS, PaaS, SaaS等云模型 (Cloud Models)
  • 浏览器缓存机制分析
  • 深入浅出webpack学习(1)--核心概念
  • 我看到的前端
  • 在GitHub多个账号上使用不同的SSH的配置方法
  • kubernetes资源对象--ingress
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (10)ATF MMU转换表
  • (3)(3.5) 遥测无线电区域条例
  • (4) openssl rsa/pkey(查看私钥、从私钥中提取公钥、查看公钥)
  • (C)一些题4
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (二开)Flink 修改源码拓展 SQL 语法
  • (七)c52学习之旅-中断
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • (转)linux 命令大全
  • **PHP二维数组遍历时同时赋值