当前位置: 首页 > news >正文

reactive streams与观察者模式

为什么80%的码农都做不了架构师?>>>   hot3.png

本文主要研究下java里头的reactive streams与观察者模式。

reactive streams

reactive编程范式是一个异步编程范式,主要涉及数据流及变化的传播,可以看做是观察者设计模式的扩展。

java里头的iterator是以pull模型,即订阅者使用next去拉取下一个数据;而reactive streams则是以push模型为主,订阅者调用subscribe方法订阅,发布者调用订阅者的onNext通知订阅者新消息。

reactive streams java api

reactive streams定义了4个java api,如下

Processor<T,R>

processor既是Subscriber也是Publisher,代表二者的处理阶段

Publisher<T>

publisher是数据的提供者, 将数据发布给订阅者

Subscriber<T>

在调用Publisher.subscribe(Subscriber)之后,Subscriber.onSubscribe(Subscription)将会被调用

Subscription

Subscription代表订阅者与发布者的一次订阅周期,一旦调用cancel去掉订阅,则发布者不会再推送消息。

观察者模式

观察者模式的实现有推模型和拉模型

  • 拉模型

即发布者通知订阅有新消息,订阅者再去找发布者拉取

  • 推模型

即发布者通知订阅者有消息,通知的时候已经带上了一个新消息

reactor实例

maven

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.1.2.RELEASE</version>
		</dependency>

reactor 3 是java里头reactive streams的一个实现,基于reactive streams的java api,是spring 5反应式编程的基础。

Flux实例

    @Test
    public void testBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

小结

从上面的代码看,reactive streams实际上是推拉结合的模式的结合。为什么还要拉呢?

rabbitmq vs kafka

rabbitmq是以推为主的,如果消费者消费能力跟不上,则消息会堆积在内存队列中(必要时可能写磁盘)

kafka则是以拉为主的,生产者推送消息到broker,消费者自己根据自己的能力从broker拉取消息,由于消息是持久化的,因此无需关心生产消费速率的不平衡

backpressure

backpressure这个是为处理生产速率与消费速率不平衡这个问题而衍生出来的,订阅者可以在next方法里头根据自己的情况,使用request方法告诉发布者要取N个数据,发布者则向订阅者推送N个数据。通过request达到订阅者对发布者的反馈。而对于发布者而言,为了实现backpressure,则需要有一个缓存队列来缓冲订阅者没来得及消费的数据。涉及到缓冲,就涉及容量是有界还是无界,如果是有界则在缓冲慢的时候,处理策略是怎样等等。

doc

  • reactive streams java api
  • Java 9 揭秘(17. Reactive Streams)
  • Java 9 Reactive Streams

转载于:https://my.oschina.net/go4it/blog/1605846

相关文章:

  • 面试技术题笔记
  • 从String源码看Java中的编码
  • 怎样让html加载完毕后加载js代码
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Vmware虚拟机的单用户模式
  • Flask学习笔记(2)-login_page
  • shell安全防范———慎将当前目录.加入PATH~~~之~隔壁老王来敲门
  • Java:泛型
  • Myth源码解析系列之一-项目简介
  • 在地铁上看了zabbix 的书发现 报警执行远程命令
  • Python中级 —— 01面向对象进阶
  • Ansible批量修改root密码(playbook)
  • 健忘?科学家想用机器学习+电击实验,帮你增强记忆力
  • mysql 设置自增id起始值
  • 多迪技术总监告诉你为什么人工智能用Python?
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • Git同步原始仓库到Fork仓库中
  • Leetcode 27 Remove Element
  • PHP CLI应用的调试原理
  • PHP的类修饰符与访问修饰符
  • Python 反序列化安全问题(二)
  • SwizzleMethod 黑魔法
  • TypeScript实现数据结构(一)栈,队列,链表
  • 大主子表关联的性能优化方法
  • 前端
  • 我看到的前端
  • 我这样减少了26.5M Java内存!
  • 异步
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • #{}和${}的区别?
  • #define MODIFY_REG(REG, CLEARMASK, SETMASK)
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • $.ajax()参数及用法
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (js)循环条件满足时终止循环
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (图)IntelliTrace Tools 跟踪云端程序
  • .bat批处理(一):@echo off
  • .net core开源商城系统源码,支持可视化布局小程序
  • .net framwork4.6操作MySQL报错Character set ‘utf8mb3‘ is not supported 解决方法
  • .NET 设计一套高性能的弱事件机制
  • .net 使用$.ajax实现从前台调用后台方法(包含静态方法和非静态方法调用)
  • .net 验证控件和javaScript的冲突问题
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • /bin/bash^M: bad interpreter: No such file or directory
  • @PreAuthorize注解
  • @property括号内属性讲解
  • [2016.7 day.5] T2
  • [20171102]视图v$session中process字段含义
  • [AX]AX2012开发新特性-禁止表或者表字段
  • [C#基础]说说lock到底锁谁?
  • [Excel]如何找到非固定空白格數列的條件數據? 以月份報價表單為例
  • [hdu 3746] Cyclic Nacklace [kmp]
  • [Kubernetes]8. K8s使用Helm部署mysql集群(主从数据库集群)
  • [MySQL FAQ]系列 -- 账号密码包含反斜线时怎么办