当前位置: 首页 > news >正文

数据结构与算法 - 优先级队列、阻塞队列

一、优先级队列

1. 无序数组实现

从无序数组中找出优先级最高的元素出队

package com.itheima.datastructure.PriorityQueue;import com.itheima.datastructure.Queue.Queue;// 基于无序数组实现
public class PriorityQueue1<E extends Priority> implements Queue<E> {Priority[] array;int size;public PriorityQueue1(int capacity) {array = new Priority[capacity];}// O(1)@Overridepublic boolean offer(E value) {if(isFull()) {return false;}array[size++] = value;return true;}// 返回优先级最高的索引值private int selectMax() {int max = 0;for (int i = 1; i < size; i++) {if(array[i].priority() > array[max].priority()) {max = i;}}return max;}// O(n)@Overridepublic E poll() {if(isEmpty()) {return null;}int max = selectMax();E e = (E) array[max];remove(max);return e;}private void remove(int index) {if(index < size - 1) {System.arraycopy(array, index + 1, array, index, size - 1 - index);}size--;}// O(n)@Overridepublic E peek() {if(isEmpty()) {return null;}int max = selectMax();return (E) array[max];}@Overridepublic boolean isEmpty() {return size == 0;}@Overridepublic boolean isFull() {return size == array.length;}
}

2. 有序数组实现

  • 入队后排好序,优先级最高的排列在尾部
  • 出队只需删除尾部元素即可
package com.itheima.datastructure.PriorityQueue;import com.itheima.datastructure.Queue.Queue;// 基于无序数组实现
public class PriorityQueue2<E extends Priority> implements Queue<E> {Priority[] array;int size;public PriorityQueue2(int capacity) {array = new Priority[capacity];}// O(n)@Overridepublic boolean offer(E value) {if(isFull()) {return false;}insert(value);size++;return true;}// 一轮插入排序private void insert(E value) {int i = size - 1;while(i >= 0 && array[i].priority() > value.priority()) {array[i + 1] = array[i];i--;}array[i + 1] = value;}// O(1)@Overridepublic E poll() {if(isEmpty()) {return null;}E value = (E) array[size - 1];array[--size] = null;  // help GCreturn value;}// O(1)@Overridepublic E peek() {if(isEmpty()) {return null;}return (E) array[size - 1];}@Overridepublic boolean isEmpty() {return size == 0;}@Overridepublic boolean isFull() {return size == array.length;}
}

3. 堆实现

计算机科学中,堆是一种基于树的数据结构,通常用完全二叉树实现,堆的特性如下:

  • 在大顶堆中,任意节点C与它的父节点P符合P.value ≥ C.value
  • 在小顶堆中,任意节点C与它的父节点P符合P.value ≤ C.value
  • 最顶层的节点(没有父亲)称之为root根节点

满二叉树:每一层都是填满的

完全二叉树:最后一层可能未填满,靠左对齐。完全二叉树可以用数组来表示:

如果从索引0开始存储结点数据

  • 当 i > 0时,节点i的父节点为 floor((i - 1)/2)
  • 节点i的左子节点为 2i + 1,右子节点为 2i + 2,当然它们得 < size

②如果从索引1开始存储节点数据

  • 当 i > 1时,节点i的父节点为 floor(i / 2)
  • 节点i的左子节点为 2i,右子节点为 2i + 1,同样得 < size
package com.itheima.datastructure.PriorityQueue;import com.itheima.datastructure.Queue.Queue;import java.util.Arrays;/*** 基于<b>大顶堆</b>实现* @param <E> 队列中元素类型, 必须实现 Priority 接口*/
public class PriorityQueue3<E extends Priority> implements Queue<E> {Priority[] array;int size;public PriorityQueue3(int capacity) {array = new Priority[capacity];}@Overridepublic String toString() {return Arrays.toString(array);}/*** 1. 入堆新元素,加入到数组末尾(索引位置child)* 2. 不断比较新加元素与它父节点parent的优先级*      - 如果父节点优先级低,则向下移动,并找到下一个parent*      - 直至父节点优先级更高或者child == 0 为止* @param offered 待插入值* @return*/@Overridepublic boolean offer(E offered) {if (isFull()) {return false;}int child = size++;int parent = (child - 1) / 2;while (child > 0 && offered.priority() > array[parent].priority()) {array[child] = array[parent];child = parent;parent = (child - 1) / 2;}array[child] = offered;return true;}private void swap(int i, int j) {Priority t = array[i];array[i] = array[j];array[j] = t;}/*** 1. 交换堆顶和尾部元素,让尾部元素出队* 2. 下潜*      - 从堆顶开始,将父元素与两个孩子的较大者交换*      - 直到父元素大于两个孩子,或没有孩子为止* @return*/@Overridepublic E poll() {if (isEmpty()) {return null;}swap(0, --size);Priority e = array[size];array[size] = null; // help GCdown(0, array[0]);return (E) e;}private void down(int parent) {int left = 2 * parent + 1;int right = left + 1;int max = parent;if(left < size && array[left].priority() > array[max].priority()) {max = left;}if(right < size && array[right].priority() > array[max].priority()) {max = right;}if(max != parent) {swap(max, parent);down(max);}}// 与 PriorityQueue4 不同在于没有用 swap 实现下潜private void down(int parent, Priority top) {int left = 2 * parent + 1;int right = left + 1;int max = parent;// 两个孩子里找个大的if (left < size) {max = left;if (right < size && array[right].priority() > array[left].priority()) {max = right;}}if (max != parent && array[max].priority() > top.priority()) {array[parent] = array[max];// 递归找到数组里最大的存放到top位置down(max, top);} else { // 没孩子, 或孩子没 top 大array[parent] = top;}}@Overridepublic E peek() {if (isEmpty()) {return null;}return (E) array[0];}@Overridepublic boolean isEmpty() {return size == 0;}@Overridepublic boolean isFull() {return size == array.length;}
}

4. 习题

4.1 合并多个有序链表

解法一:优先队列

/*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val = val; }* ListNode(int val, ListNode next) { this.val = val; this.next = next; }* }*/
class Solution {public ListNode mergeKLists(ListNode[] lists) {PriorityQueue<ListNode> pq = new PriorityQueue<>(Comparator.comparingInt(a -> a.val));for (ListNode list : lists) {if (list != null) {pq.offer(list);}}ListNode sentinel = new ListNode(-1, null);ListNode tail = sentinel;while (!pq.isEmpty()) {ListNode node = pq.poll();tail.next = node;tail = tail.next;if (node.next != null) {pq.offer(node.next);}}return sentinel.next;}
}

问题:是否能将每个链表的所有元素全部加入堆,再一个个从堆顶移除?

答:可以是可以,但堆空间占用就高了,堆的一个优点就是用有限的空间做事情。

二、阻塞队列

之前的队列在很多场景下都不能很好地工作,例如:

  • 1. 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色,它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
  • 2. 队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?只能不断地循环尝试
  • 3. 队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?只能不断地循环尝试

因此,我们需要解决的问题有:

  • 1. 用锁保证线程安全
  • 2. 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让CPU空转

1. 单锁实现

Java中要防止代码段交错执行,需要使用锁,有两种选择

  • synchronized代码块,属于关键字级别提供锁保护,功能少
  • ReentrantLock类,功能丰富

以ReentrantLock为例

先定义接口

package com.itheima.datastructure.blockingqueue;/**目前队列存在的问题<ol><li>很多场景要求<b>分离</b>生产者、消费者两个<b>角色</b>、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题</li><li>队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试</li><li>队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试</li></ol>解决方法<ol><li>用锁保证线程安全</li><li>用条件变量让 poll 或 offer 线程进入<b>等待</b>状态,而不是不断循环尝试,让 CPU 空转</li></ol>*/public interface BlockingQueue<E> { // 阻塞队列void offer(E e) throws InterruptedException;boolean offer(E e, long timeout) throws InterruptedException;E poll() throws InterruptedException;
}
ReentrantLock lock = new ReentrantLock();public void offer(String e) {lock.lockInterruptibly();try {array[tail] = e;tail++;} finally {lock.unlock();}
}

只要两个线程执行上段代码时,锁对象是同一个,就能保证try块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一

线程1线程2说明
lock.lockInterruptibly()t1对锁对象上锁
array[tail]=e1
lock.lockInterruptibly()即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去
tail++切换回线程1 执行后续代码
lock.unlock()线程1 解锁
array[tail]=e2线程2 此时才能获得锁,执行它的代码
tail++
  • 另一种情况是线程2先获得锁,线程1被挡在外面
  • 要明白保护的本质,本例中保护的是tail位置读写的安全

事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)

ReentrantLock lock = new ReentrantLock();
int size = 0;public void offer(String e) {lock.lockInterruptibly();try {if(isFull()) {// 满了怎么办?}array[tail] = e;tail++;size++;} finally {lock.unlock();}
}private boolean isFull() {return size == array.length;
}

之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:

  • 在队列满时,不是立刻返回,而是当前线程进入等待
  • 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行

ReentrantLock 可以配合条件变量来实现,代码进化为

ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;public void offer(String e) {lock.lockInterruptibly();try {while (isFull()) {tailWaits.await();	// 当队列满时, 当前线程进入 tailWaits 等待}array[tail] = e;tail++;size++;} finally {lock.unlock();}
}private boolean isFull() {return size == array.length;
}
  • 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
  • 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行

思考为何要用 while 而不是 if,设队列容量是 3

操作前offer(4)offer(5)poll(e操作后
[1 2 3]队列满,进入tailWaits 等待[1 2 3]
[1 2 3]取走 1,队列不满,唤醒线程[2 3]
[2 3]抢先获得锁,发现不满,放入 5[2 3 5]
[2 3 5]从上次等待处直接向下执行[2 3 5 ?]

关键点:

  • 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
  • 这种情况称之为**虚假唤醒**,唤醒后应该重新检查条件,看是不是得重新进入等待

最后的实现代码:

package com.itheima.datastructure.BlockingQueue;import org.springframework.beans.factory.annotation.Value;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue1<E> implements BlockingQueue<E>{private final E[] array;private int head;private int tail;private int size;public BlockingQueue1(int capacity) {array = (E[]) new Object[capacity];}private ReentrantLock lock = new ReentrantLock();  // 可重入锁private Condition headWaits = lock.newCondition();private Condition tailWaits = lock.newCondition();@Overridepublic void offer(E e) throws InterruptedException {lock.lockInterruptibly();try {while(isFull()) {tailWaits.await();}array[tail] = e;if(++tail == array.length) {tail = 0;}size++;headWaits.signal();  // 唤醒等待队列 -> poll} finally {lock.unlock();}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly();  // 可中断try {long t = TimeUnit.MICROSECONDS.toNanos(timeout);while(isFull()) {if(t <= 0) {return false;}t = tailWaits.awaitNanos(t);  // 最多等待多少纳秒,返回值代表剩余时间}array[tail] = e;if(++tail == array.length) {tail = 0;}size++;headWaits.signal();  // 唤醒等待队列 -> pollreturn true;} finally {lock.unlock();}}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try {while(isEmpty()) {headWaits.await();}E value = array[head];array[head++] = null; // help GCif(++head == array.length) {head = 0;}size--;tailWaits.signal();  // 唤醒等待队列 -> offerreturn value;} finally {lock.unlock();}}private boolean isEmpty() {return size == 0;}private boolean isFull() {return size == array.length;}
}
  • public void offer(E e, long timeout) throws InterruptedException 是带超时的版本,可以只等待一段时间,而不是永久等下去,类似的 poll 也可以做带超时的版本,这个留给大家了

2. 双锁实现

单锁的缺点在于:

  • 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改size
  • 冲突的主要是生产者之间:多个offer线程修改tail
  • 冲突的还要消费者之间:多个poll线程修改head

如果希望进一步提高性能,可以用两把锁

  • 一把锁保护tail
  • 另一把锁保护head
ReentrantLock headLock = new ReentrantLock();  // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合ReentrantLock tailLock = new ReentrantLock();  // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合

先看offer方法的初步实现

@Override
public void offer(E e) throws InterruptedException {tailLock.lockInterruptibly();try {// 队列满等待while (isFull()) {tailWaits.await();}// 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 修改 size (有问题)size++;} finally {tailLock.unlock();}
}

上面代码的缺点是size并不受tailLock保护,tailLock与headLock是两把不同的锁,并不能实现互斥的效果。因此,size需要用下面的代码保证原子性

AtomicInteger size = new AtomicInteger(0);	   // 保护 size 的原子变量size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减

代码修改为

@Override
public void offer(E e) throws InterruptedException {tailLock.lockInterruptibly();try {// 队列满等待while (isFull()) {tailWaits.await();}// 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 修改 sizesize.getAndIncrement();} finally {tailLock.unlock();}
}

对称地,可以写出poll方法

@Override
public E poll() throws InterruptedException {E e;headLock.lockInterruptibly();try {// 队列空等待while (isEmpty()) {headWaits.await();}// 不空则出队e = array[head];if (++head == array.length) {head = 0;}// 修改 sizesize.getAndDecrement();} finally {headLock.unlock();}return e;
}

完整代码:

package com.itheima.datastructure.BlockingQueue;import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 双锁实现* @param <E> 元素类型*/
@SuppressWarnings("all")
public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private AtomicInteger size = new AtomicInteger();private ReentrantLock tailLock = new ReentrantLock();private Condition tailWaits = tailLock.newCondition();private ReentrantLock headLock = new ReentrantLock();private Condition headWaits = headLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}private boolean isEmpty() {return size.get() == 0;}private boolean isFull() {return size.get() == array.length;}@Overridepublic String toString() {return Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {int c; // 添加前元素个数tailLock.lockInterruptibly();try {// 1. 队列满则等待while (isFull()) {tailWaits.await(); //  offer2}// 2. 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 3. 修改 size/*size = 6*/c = size.getAndIncrement();if (c + 1 < array.length) {tailWaits.signal();}/*1. 读取成员变量size的值  52. 自增 63. 结果写回成员变量size 6*/} finally {tailLock.unlock();}// 4. 如果从0变为非空,由offer这边唤醒等待非空的poll线程//                       0->1   1->2    2->3if(c == 0) {headLock.lock(); // offer_1 offer_2 offer_3try {headWaits.signal();} finally {headLock.unlock();}}}@Overridepublic E poll() throws InterruptedException {E e;int c; // 取走前的元素个数headLock.lockInterruptibly();try {// 1. 队列空则等待while (isEmpty()) {headWaits.await(); // poll_4}// 2. 非空则出队e = array[head];array[head] = null; // help GCif (++head == array.length) {head = 0;}// 3. 修改 sizec = size.getAndDecrement();// 3->2   2->1   1->0// poll_1 poll_2 poll_3if (c > 1) {headWaits.signal();}/*1. 读取成员变量size的值 52. 自减 43. 结果写回成员变量size 4*/} finally {headLock.unlock();}// 4. 队列从满->不满时 由poll唤醒等待不满的 offer 线程if(c == array.length) {tailLock.lock();try {tailWaits.signal(); // ctrl+alt+t} finally {tailLock.unlock();}}return e;}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {return false;}public static void main(String[] args) throws InterruptedException {BlockingQueue2<String> queue = new BlockingQueue2<>(3);queue.offer("元素1");queue.offer("元素2");new Thread(()->{try {queue.offer("元素3");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "offer").start();new Thread(()->{try {queue.poll();} catch (InterruptedException e) {throw new RuntimeException(e);}}, "poll").start();}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 我对于内存相关的三个问题的理解和总结——内存泄漏、内存溢出、野指针
  • 宏景eHR /ajax/ajaxService SQL注入漏洞复现
  • 【时时三省】unity test 测试框架 使用 code blocks 移植
  • 如何解决C#字典的线程安全问题
  • 40.组合综合Ⅱ
  • 【JavaEE精炼宝库】 网络编程套接字——UDP业务逻辑 | TCP流套接字编程及业务逻辑实现
  • 沉浸式企业VR展厅,重塑企业形象展示方式!
  • 程序员进阶架构知识体系、开发运维工具使用、Java体系知识扩展、前后端分离流程详解、设计模式开发实例汇总专栏分享
  • ComfyUI: 报EP Error错误(onnxruntime)
  • nginx的反向代理及负载均衡
  • 软件测试基础1--功能测试
  • EasyAR_稠密空间图
  • RIP综合练习
  • 在快速消费品业务中利用知识管理的指南
  • 文件存储服务系统(File Storage Service System)-02-SFTP 协议介绍
  • 78. Subsets
  • Brief introduction of how to 'Call, Apply and Bind'
  • canvas 绘制双线技巧
  • CSS 专业技巧
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • JS+CSS实现数字滚动
  • JS基础之数据类型、对象、原型、原型链、继承
  • Lsb图片隐写
  • mockjs让前端开发独立于后端
  • Netty源码解析1-Buffer
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • webpack入门学习手记(二)
  • 从0搭建SpringBoot的HelloWorld -- Java版本
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 离散点最小(凸)包围边界查找
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 如何合理的规划jvm性能调优
  • 使用 @font-face
  • 数组大概知多少
  • 算法之不定期更新(一)(2018-04-12)
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 新版博客前端前瞻
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • %3cli%3e连接html页面,html+canvas实现屏幕截取
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (20050108)又读《平凡的世界》
  • (2024.6.23)最新版MAVEN的安装和配置教程(超详细)
  • (27)4.8 习题课
  • (3) cmake编译多个cpp文件
  • (Pytorch框架)神经网络输出维度调试,做出我们自己的网络来!!(详细教程~)
  • (二十四)Flask之flask-session组件
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)ssm高校志愿者服务系统 毕业设计 011648
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)