当前位置: 首页 > news >正文

Disruptor生产和消费模式详解及高级应用(并行模式)

小伙伴们大家好,昨天的文章,带着大家扒开了Disruptor华丽的外衣,最重要的是我们知道了Disruptor高性能的原因几个重要的原因,

  • 引入环形的数组结构:数组元素不会被回收,避免频繁的GC,
  • 无锁的设计:采用CAS无锁方式,保证线程的安全性
  • 属性填充:通过添加额外的无用信息,避免伪共享问题
  • 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增

这篇文章就在上篇文章的基础上来点实战应用。研究下Disruptor的生产和消费模式,以及高级应用,至此关于Disruptor的系列的文章,也就到此结束了,我已经尽力了,如果还有什么没能满足大家需求的,以及关于文章的内容大家有任何其他的看法的,也欢迎在评论区留言,毕竟本人才疏学浅,期待各位大佬能给小弟指点一二。

前两篇文章在这里哦:

  • 如此狂妄,自称高性能队列的Disruptor有啥来头?
  • Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣

生产和消费模式

根据上面的环形结构,我们来具体分析一下Disruptor的工作原理。

​ Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上面的seq),那么这个是如何保证生产的消息不会覆盖没有消费掉的消息呢。

​ 在Disruptor中生产者分为单生产者和多生产者,而消费者并没有区分。

​ 单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费的位置,并进行消费。而多生产者时候,又多出了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。

​ 在多生产者中,每个生产者首先通过CAS竞争获取可以写的空间,然后再进行慢慢往里放数据,如果正好这个时候消费者要消费数据,那么每个消费者都需要获取最大可消费的下标,这个下标是在AvailableBuffer进行获取得到的最长连续的序列下标。

5.1 单生产者生产数据

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

file

5.2 多生产者生产数据

​ 多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

​ 但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

5.3.1 生产流程
  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

file

5.3.2 CAS检测空间占用

防止不同生产者对同一段空间写入的代码,如下所示:

​ 通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }
 
    long current;
    long next;
 
    do
    {
        current = cursor.get();
        next = current + n;
 
        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));
 
    return next;
}

5.4 多生产者消费数据

绿色代表已经写OK的数据

​ 假设三个生产者在写中,还没有置位AvailableBuffer,那么消费者可获取的消费下标只能获取到6,然后等生产者都写OK后,通知到消费者,消费者继续重复上面的步骤。

5.4.1 消费流程
  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

然后,消费者读取下标从3到6共计4个元素。

file

6. 高级使用

6.1 并行模式

6.1.1 单一写者模式

​ 在并发系统中提高性能最好的方式之一就是单一写者原则,对Disruptor也是适用的。如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。

file

public class singleProductorLongEventMain { 
    public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                ProducerType.SINGLE, // 单一写者模式, 
                executor);//..... 
    } 
} 
6.1.2 串行消费

比如:现在触发一个注册Event,需要有一个Handler来存储信息,一个Hanlder来发邮件等等。

file

/**
  * 串行依次执行
  * <br/>
  * p --> c11 --> c21
  * @param disruptor
  */
 public static void serial(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }
6.1.3 菱形方式执行

file

 public static void diamond(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }
6.1.4 链式并行计算

file

 public static void chain(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
     disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
     disruptor.start();
 }
6.1.5 相互隔离模式

file

 public static void parallelWithPool(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
     disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
     disruptor.start();
 }
6.1.6 航道模式

file

/**
  * 串行依次执行,同时C11,C21分别有2个实例
   * <br/>
   * p --> c11 --> c21
   * @param disruptor
   */
  public static void serialWithPool(Disruptor<LongEvent> disruptor){
      disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
      disruptor.start();
  }

本文由传智教育博学谷教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

相关文章:

  • [算法周训 3] 字符串训练2
  • 判断月份所在的季节
  • 大数据毕设选题 - flask疫情数据可视化系统(python)
  • 记录第一次开源流计算框架Flink代码的贡献
  • 共码未来 | 助力实现事半功倍的前端开发体验
  • 客户端存储localStorage和sessionStorage以及Cookie
  • Python学习笔记:Jupyter Notebook快速入门案例:学习时间与成绩的关系
  • 嵌入式软件工程师面试题(三)
  • K8S搭建共享存储(以MySQL例)
  • 【C++】类和对象(中篇)(万字)
  • 虹科教您 | 虹科TSN配置软件RELY-TSN-Configurator基本操作指南
  • 【python基础】super是啥,你会用吗?
  • 反向传播和其他微分算法
  • 爆肝撸了个“羊了个羊”通关助手
  • Flutter快学快用17 打包发布:Flutter 应用,你离线上运营只差最后一步
  • ES6系统学习----从Apollo Client看解构赋值
  • JS基础之数据类型、对象、原型、原型链、继承
  • React系列之 Redux 架构模式
  • unity如何实现一个固定宽度的orthagraphic相机
  • windows下如何用phpstorm同步测试服务器
  • 从 Android Sample ApiDemos 中学习 android.animation API 的用法
  • 人脸识别最新开发经验demo
  • 设计模式走一遍---观察者模式
  • 数据科学 第 3 章 11 字符串处理
  • 思考 CSS 架构
  • 项目管理碎碎念系列之一:干系人管理
  • 一文看透浏览器架构
  • Spring第一个helloWorld
  • # .NET Framework中使用命名管道进行进程间通信
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (二)springcloud实战之config配置中心
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (教学思路 C#之类三)方法参数类型(ref、out、parmas)
  • (欧拉)openEuler系统添加网卡文件配置流程、(欧拉)openEuler系统手动配置ipv6地址流程、(欧拉)openEuler系统网络管理说明
  • (十二)devops持续集成开发——jenkins的全局工具配置之sonar qube环境安装及配置
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • (四)搭建容器云管理平台笔记—安装ETCD(不使用证书)
  • (万字长文)Spring的核心知识尽揽其中
  • (原創) 未来三学期想要修的课 (日記)
  • (转)Mysql的优化设置
  • (转)为C# Windows服务添加安装程序
  • (转)一些感悟
  • (轉)JSON.stringify 语法实例讲解
  • * CIL library *(* CIL module *) : error LNK2005: _DllMain@12 already defined in mfcs120u.lib(dllmodu
  • .net core webapi 大文件上传到wwwroot文件夹
  • .NET Core 网络数据采集 -- 使用AngleSharp做html解析
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .Net MVC4 上传大文件,并保存表单
  • .NET/C# 编译期间能确定的相同字符串,在运行期间是相同的实例
  • .NET成年了,然后呢?
  • .Net接口调试与案例
  • .NET企业级应用架构设计系列之应用服务器
  • .考试倒计时43天!来提分啦!
  • /3GB和/USERVA开关