当前位置: 首页 > news >正文

BlockingQueue

网上看了好多文章将线程池的但是似乎都没的多少人会详细讲解里面的任务队列,所以只有自己动手学习其中的任务队列

BlockingQueue

在这里插入图片描述
要学习其中的任务队列就需要先学习BlockingQueue,Blocking是一个接口,其中主要的方法为

	// 尝试往队尾添加元素,添加成功返回true,添加失败返回false
	boolean add(E e);
	// 尝试往队尾添加元素,添加成功返回true,添加失败返回false
	boolean offer(E e);
	// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到其能够添加成功为止
	void put(E e) throws InterruptedException;
	// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到超时
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	// 从队头取出元素,如果队列为空则一直等待
	E take() throws InterruptedException;
	// 从队头取出元素,如果队列为空则等待一段时间
	E poll(long timeout, TimeUnit unit) throws InterruptedException;
	int remainingCapacity();
	//从队列中移除指定对象
	boolean remove(Object o);
	//判断队列是否存在指定对象
	public boolean contains(Object o);
	//将队列中元素转移到指定集合
	int drainTo(Collection<? super E> c);
	//将最多MAX个元素转移到指定集合
	int drainTo(Collection<? super E> c, int maxElements);

ArrayBlockingQueue

ArrayBlockingQueue的底层是基于数组实现,当指定容量后数组就确定了不会发生扩容

参数

	// 元素
    final Object[] items;
    //可以被取到的元素下标
    int takeIndex;
    //可以放入元素的下标
    int putIndex;
    //元素个数
    int count;
    //锁
    final ReentrantLock lock;
    //等待条件,用于队列为空的时候阻塞当前线程获取
    private final Condition notEmpty;
    //等待条件,用于队列满的时候阻塞当前线程加入元素
    private final Condition notFull;
    transient Itrs itrs = null;

通过上述数据结构可以看出,ArrayBlockingQueue是通过一个循环数组的方式来实现存储元素的,这里takeIndex记录当前可以取元素的索引位置,而putIndex则记录了下一个元素可以放入的位置,如果队列满了则是takeIndex == putIndex,这里可以通过判断count字段来判断当前是处于满状态还是空置状态,通过一个全局锁lock来实现控制
对于其中的方法比较重要的是出队与入队方法,enqueue与dequeue

重要方法

enqueue与dequeue

其中入队与出队就是将对应位置的putIndex与takeIndex放入其中位置即可,然后加一,但是加一要判断是否超过了当前数组最大位置,如果是则设置为0,同时需要唤醒对应条件的等待队列

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

这是其中内层调用的方法,而外部方法我们提供方法为

put与take

put与take实现了其阻塞队列满足条件的方法

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();   //通过while循环以防止当前线程被意外唤醒,如果当前循环被打破则代表没有满了
            enqueue(e); // 放入元素
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //与上面类似
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

从这里可以看出ArrayBlockingQueue实现的是先进先出

LinkedBlockingQueue

LinkedBlockingQueue,其底层是通过一个单项链表实现的,由于单项链表需要有一个指向下一个节点的指针,因而其必须使用一个对象这里是Node来存储当前元素的值和下一个节点索引

Node节点

    static class Node<E> {
        //当前元素的值
        E item;
        //下一个元素
        Node<E> next;
        Node(E x) { item = x; }
    }

参数

	//容量
    private final int capacity;
    //当前队列已经存储个数
    private final AtomicInteger count = new AtomicInteger();
    //头指针
    transient Node<E> head;
    //尾指针
    private transient Node<E> last;
    //从队列取出元素的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //等待如果队列为空
    private final Condition notEmpty = takeLock.newCondition();
    //放入元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

这里与ArrayBBlockingQueue存在着一些差异,其中head与last与takeIndex与putIndex都是类似的,但是LinkedBlockingQueue使用了两把锁,而上面只使用了一把锁

重要方法

enqueue与dequeue

private void enqueue(Node<E> node) {
		//将队列尾部节点的下一个节点指向新的节点,并更新尾部节点为最新的节点
        last = last.next = node;
    }
	
	//返回头节点的下一个节点并更新头节点
	//因为头节点存储不是第一个元素
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

可以看到对于链表的入队与出队操作是非常简单的,所以我们需要看其中的take与put方法

take与put


 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();    // 如果满了则进入等待
            }
            enqueue(node); //放入元素
            c = count.getAndIncrement(); //元素个数++
            if (c + 1 < capacity) //如果添加元素过后还是未满那么则继续唤醒下一个
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0) 
        	//将等待取出的线程唤醒,而唤醒的时候也必须获取take锁才能唤醒
            signalNotEmpty();
    }


 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await(); //同理
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal(); //继续获取
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();  //同理
        return x;
    }

ArrayBlockingQueue与LinkedBlockingQueue区别

1、两种底层数据结构不同,一个是基于循环数组一个是基于单向链表
2、两种阻塞方式不同,ArrayBlockingQueue使用了一个全局锁来处理所有操作,也就是无论插入还是获取都只能一个线程执行,而LinkedBlockingQueue则是使用两个锁,使得获取与放入无干扰
3、两着初始化不同,ArrayBlockingQueue必须指定一个大小初始化而LinkedBlockingQueue则可以不指定,不指定则为Integer.MAX_VALUE

SynchronousQueue

这个阻塞队列就比上面两种麻烦多了,那就需要一步一步理解
SynchronousQueue也是一个队列来的,但他的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put时候),如果当前没有人想要消费产品此生产线程必须阻塞等待一个消费者调用take操作,take操作将唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称一次配对过程

构造器

其构造器可传入是公平还是非公平的,默认是非公平的

如果是公平的则采用TransferQueue如果是非公平的则采用TransferStack

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

从源码上课其中的pull、take等方法都素调用transfer方法

transfer中有三个参数:
e:要存放的元素
timed:是否超时等待
nanos:超时等待时间

TransferQueue

TransferQueue内部有一个内部类:QNode,TransferQueue是由QNode节点构成的链表结构

QNode

	//下一个节点
    volatile QNode next;
    //存入元素         
    volatile Object item;
    //等待线程     
    volatile Thread waiter; 
    //是否是数据
    final boolean isData;

在这里插入图片描述

TransferQueue初始化

TransferQueue创建时会初始化一个QNode节点,head,tail都会指向这个空节点,在TransferQueue中会以根据传入的参数:e是否为null来将节点分为两类,从TransferQueue队列中获取元素的线程是同一类节点,比如:调用take,poll的线程就是同一类节点;从TransferQueue队列中添加元素的线程是一类节点

TransferQueue队列特殊的地方就在于这个队列中只会存在一种节点:要么是获取元素的线程节点,要么是添加元素的线程节点

在这里插入图片描述
在初始化TransferQueue对象时,会初始化生产一个节点队列的头,尾:head,tail都会指向这个init节点

举个例子:假设当前队列中都是put线程,此时有一个take线程,那么这个take线程就会唤醒队列中的一个put线程

在这里插入图片描述
在唤醒线程时,同时会修改该线程所在节点的item值,在后面分析源码时候会看到,如果只是唤醒线程是没有用的,还需要将item的值修改才能真正唤醒该线程

Transfer

下面就来分析Transfer方法

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; 
            boolean isData = (e != null); // 判断当前是什么类型线程

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)       
                    continue;                    

                if (h == t || t.isData == isData) {  // 如果队列为空 || 新类型线程与队列中线程类型一致
                    QNode tn = t.next;  
                    if (t != tail)   //队列尾节点已经被更新                
                        continue;
                    if (tn != null) {  //有新节点加入到队列      
                        advanceTail(t, tn); //更新尾节点
                        continue;
                    }
                    if (timed && nanos <= 0)        
                        return null;
                    if (s == null)
                        s = new QNode(e, isData); //将线程包装成QNode节点
                    if (!t.casNext(null, s))        //将新节点添加到队列末尾
                        continue;

                    advanceTail(t, s); //添加成功后更新tail      
                    Object x = awaitFulfill(s, e, timed, nanos); //等待被唤醒
                    if (x == s) { //中断标记,带阻塞时间的线程等待了规定时间恢复运行            
                        clean(t, s); //节点从队列中删除
                        return null;
                    }

                    if (!s.isOffList()) {           
                        advanceHead(t, s);         
                        if (x != null)              
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {        //唤醒队列节点         
                	// 取出当前节点            
                    QNode m = h.next;               
                    if (t != tail || m == null || h != head)
                        continue;                 

                    Object x = m.item;
                    if (isData == (x != null) ||    
                        x == m ||                  
                        !m.casItem(x, e)) {  //将被唤醒线程的值修改为当前线程的值  
                        advanceHead(h, m);          
                        continue;
                    }

                    advanceHead(h, m);              
                    LockSupport.unpark(m.waiter); //唤醒线程
                    return (x != null) ? (E)x : e;
                }
            }
        }

awaitFulfill

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)  
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

特别说明一下变量spins,所有进入阻塞队列的线程都不着急立即阻塞,而是会先自旋一段时间,然后再阻塞,因为阻塞线程再唤醒线程的代价就比让线程自选的大

TransferStack

里面存在一个内部类:SNode,TransferStack是由Snode单链表构建成的堆栈结构,只有一个head指针指向链表的表头;每次添加元素都是在表头处添加,新节点称为新的表头head,唤醒的线程的时候也是唤醒head节点,因此就形成了先进后出的堆栈结构,TransferStack中根据e也就线程分为两类,一类是获取元素:REQUEST,一类的添加元素:DATA,其中也只有一种节点只有被唤醒时候才会短暂出现2种节点

在这里插入图片描述
SNode

	  //下一个节点
      volatile SNode next;        
      volatile SNode match;
      //当前线程      
      volatile Thread waiter;
      //值    
      Object item;
      //模式                
      int mode;

在TransferStack的堆栈中,如果新加入的线程类型与堆栈中的节点类型不同,那么会先将新线程包装成Snode节点加入堆栈中,成为新的header节点并将旧的节点唤醒。然后更新head节点返回DATA类型节点的元素值

在这里插入图片描述

在有不同类型的节点进入堆栈中的时候,新节点添加到堆栈顶端并更新为新的head节点;这个节点的mode = REQUEST | FULFILLING ;FULFILLING 是用来标记,表示这个head节点正在唤醒堆栈中的一个节点线程;最后在新节点唤醒旧的head节点( oldHead节点)之后,更新堆栈的head节点;

TransferStack部分的源码就再不分析了,入队阻塞部分的源码几乎与TransferQ ueue一样;TransferStack唤醒节点的方式与TransferQueue有点差别,TransferStack是将新节点先包装成节点添加到堆栈中,再唤醒节点线程,最后重新设置堆栈的head指针并将这2个节点清除出堆栈。

SynchronousQueue 这位大佬写的SynchronousQueue感觉很好,画图也很好只有自己理解但是想不出这些理解的话,感谢这位大佬我只是资源的整合者

相关文章:

  • Java之线程详解(三)——多线程常用API、七种状态、优先级、Lock锁
  • JavaScript -- 07. 面向对象编程
  • 力扣(LeetCode)133. 克隆图(C++)
  • Android kotlin实现Viewpager滑动背景透明效果渐变
  • 用Tinyproxy搭建自己的proxy server
  • 简单入门编写html登录界面
  • [网络工程师]-应用层协议-SNMP
  • 【云原生之kubernetes实战】在k8s环境下部署jpress开源网站
  • HTML+CSS+JS网页设计期末课程大作业 DW个人博客网站制作 web前端开发技术 web课程设计 网页规划与设计
  • SpringBoot_整合PageHelper
  • 【数据结构与算法】一套链表 OJ 带你轻松玩转链表
  • C/C++大学课程信息系统
  • 【网络编程】第三章 网络套接字(TCP协议程序+多进程+多线程+线程池)
  • 基于物联网设计的自反馈深紫外杀菌消毒系统(STM32F407)
  • react路由v6版本NavLink的两个小坑及解决
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 【mysql】环境安装、服务启动、密码设置
  • 【跃迁之路】【444天】程序员高效学习方法论探索系列(实验阶段201-2018.04.25)...
  • AngularJS指令开发(1)——参数详解
  • Essential Studio for ASP.NET Web Forms 2017 v2,新增自定义树形网格工具栏
  • Flex布局到底解决了什么问题
  • Hibernate最全面试题
  • JWT究竟是什么呢?
  • 计算机在识别图像时“看到”了什么?
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 马上搞懂 GeoJSON
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 想写好前端,先练好内功
  • 移动端唤起键盘时取消position:fixed定位
  • 异常机制详解
  • 树莓派用上kodexplorer也能玩成私有网盘
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (动手学习深度学习)第13章 计算机视觉---微调
  • (附源码)springboot工单管理系统 毕业设计 964158
  • (附源码)springboot教学评价 毕业设计 641310
  • (附源码)ssm教材管理系统 毕业设计 011229
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (四)模仿学习-完成后台管理页面查询
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • .Family_物联网
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .NET Standard / dotnet-core / net472 —— .NET 究竟应该如何大小写?
  • .NET Standard 的管理策略
  • .NET 分布式技术比较
  • .NET简谈设计模式之(单件模式)
  • @RequestParam @RequestBody @PathVariable 等参数绑定注解详解
  • [ JavaScript ] JSON方法
  • [.net]官方水晶报表的使用以演示下载
  • []sim300 GPRS数据收发程序
  • [20161214]如何确定dbid.txt
  • [52PJ] Java面向对象笔记(转自52 1510988116)
  • [BUUCTF]-PWN:[极客大挑战 2019]Not Bad解析
  • [codevs] 1029 遍历问题
  • [EFI]Lenovo ThinkPad X280电脑 Hackintosh 黑苹果引导文件
  • [Gradle] 在 Eclipse 下利用 gradle 构建系统