当前位置: 首页 > news >正文

Rxjs 学习笔记 - 简化版

RxJS 提供了一套完整的异步解決方案,让我们在面对各种异步行为(Event, AJAX, Animation 等)都可以使用相同的 API 做开发。

前置知识点

  • 同步(Synchronous)就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。是一种线性执行的方式,执行的流程不能跨越。一般用于流程性比较强的程序,比如用户登录,需要对用户验证完成后才能登录系统。
  • 异步(Asynchronous)则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。是一种并行处理的方式,不必等待一个程序执行完,可以执行其它的任务,比如页面数据加载过程,不需要等所有数据获取后再显示页面。
  • 观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Obesver)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。观察者模式主题和观察者是分离的,不是主动触发而是被动监听。
  • 迭代器模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。迭代器模式可以把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。
  • 响应式编程,即 Reactive Programming,它是一种基于事件模式的模型。简单来说,上一个任务执行结果的反馈就是一个事件,这个事件的到来会触发下一个任务的执行。
  • Stream / 流的本质是一个按时间顺序排列的进行中事件的序列集合。

一、RxJS 基本介绍

RxJS 全称 Reactive Extensions for JavaScript,翻译过来是 Javascript 的响应式扩展,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库

简单来说 Rx(JS) = Observables (可观察对象)+ Operator (操作符)+ Scheduler(调度器)。

二、RxJS 的核心概念:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像map、filter、concat、flatMap等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
    setTimeout 或 requestAnimationFrame或其他。

三、 理解 RxJS 中的 Observer  和 Observable

将买房作为例子套用到观察者模式中:

  • 房价即为 Observable 对象;
  • 购房者即为 Observer 对象;
  • 而购房者观察房价即为 Subscribe(订阅)关系;

结合买房的例子,描述 Observable 和 Observer 的行为:

  • Observable ****可被观察,即房价被购房者关注,并且随时间变化发出 (emit) 不同值,表现为房价波动;
  • Observer 可以观察 Observable,即购房者关注房价,并在 Observable (房价)发出不同值(房价波动)时做出响应,即买房或观望;
  • Observable 和 Observer 之间通过订阅(Subscription)建立观察关系;
  • 当 Observable 没有 Observer 的时候,即使发出了值,也不会产生任何影响,即无人关注房价时,房价不管如何波动都不会有响应;

四、Observable (可观察对象)

RxJS 的基础就是 可观察对象(Observable),只要弄懂 Observable 就算是学会一半的 RxJS ,剩下的就只是一些方法的练习和熟悉;

const observable = Observable.create(observer => {//调用 Observable.create 方法来创建一个 Observable.入参是 observer(观察者)observer.next('foo');//函数内部通过调用 observer.next() 便可生成有一系列值的一个 Observableobserver.next('bar');
})/*但是运行这段代码后并不会发生任何事情.
我们需要一个 observer 去订阅这个 observable,此后基于 observable 发出的值,observer 才会响应。
因此,在如上代码末尾,我们再加上一行:*/observable.subscribe(console.log);//运行代码,console.log 函数便会依次打印出 'foo' 和 'bar' 了。

Observeable 是观察者模式中的被观察者,它维护一段执行函数,提供了惰性执行的能力(subscribe)。

当一个 Observable 被建立后,它能被多个 observer 订阅,每个订阅关系相互独立、互不影响。

Observable 与 Promise关键性的不同点:

ObservablePromise
使用场景同步、异步均可使用用 Promise 包裹的多数是异步场景
执行时机声明式惰性执行,只有在订阅后才会执行创建时就立即执行
执行次数多次调用 subscribe 函数会执行多次只有第一次执行,后续都是取值
流程控制相较于 Promise 有更为全面的操作符提供串行、并行的函数
错误处理subscribe 函数捕获错误.catch 捕获

总的来说,Promise 可读性更优,Observable 从使用场景更为全面。

五、Observer(观察者)

Subscriber/Observer 是观察者模式中的观察者/消费者,它用来消费/执行 Observable 创建的函数

每当 可观察对象(Observable )发生事件时,便会触发观察者(Observer)相对应的方法。

它的三个方法:

  1. next (传值):每当 Observable 输出新值,next 方法就会被触发。
  2. error (错误处理):每当Observable 內发生错误时,error 方法就会被触发。
  3. complete (完成/终止):在 Observable 沒有其它数据可以获取时,complete 方法就会被触发,在 complete 被触发之后,next 方法就不会再起作用。

工作流程:

六、Subscription

订阅(subscribe) 可观察对象(Observable ),会返回一个 subscription 对象,对象具有  停止订阅 且 释放 资源 的 unsubscribe 方法。

方法:

  1. unsubcribe(取消订阅)
  2. add (分组或在取消订阅之前插入一段逻辑)

注意:调用unsubcribe后(包含add传入的其它 Subscription)不会再接收到它们的数据。

var source = Rx.Observable.timer(1000, 1000);//通过timer 创建 Observeable 实例 sourcevar subscription = source.subscribe({//调用 subscribe ,获得订阅对象subscriptionnext: function(value) {console.log(value)},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error)}});setTimeout(() => {subscription.unsubscribe() // 停止订阅 (退订)
}, 5000);// 0
// 1
// 2
// 3
// 4

七、主体(Subject)

RxJS 中 Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

对于 Subject,你提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。

从观察者的角度而言,无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。在这里,subscribe 类似于 JavaScript 中的 addEventListener;与此同时,每个 Subject 又都是观察者。Subject 是一个有如下方法的对象: next(v)、 error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue) 方法。就是说 Subject既可以是被观察者/生产者也可以是观察者/消费者

优点

  1. 减少开销和提高性能
  2. 数据实时推送

与 Observable 的区别

ObservableSubject
角色生产者(单向)生产者、消费者(双向)
消费策略单播多播
流转方式内部发送/接收数据外部发送/接收数据
数据特性冷数据流热数据流
消费时机调用 subscribe调用 next

冷数据流:可以订阅任意时间的数据流。

热数据流:只给已订阅的消费者发送消息,定阅之前的消费者,不会收到消息。

其他 Subject

种类作用
BehaviorSubject回放数据,如果是订阅前推送的数据,只回放最新的值
ReplaySubject回放数据,初始化设定要缓存多少次的值,然后将这批消息推送
AsyncSubject只有调用 complete 后才会推送数据

八、Operator (操作符)

Operator 本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出,前面的 Observable 保持不变。

RxJS 6 及更新版本提供了可链式调用(Pipeable)的 RxJS 操作符,假设 source 是一个已定义的 observable,一个简单示例如下:

source.pipe(map(x => x + x),mergeMap(n => of(n + 1, n + 2).pipe(filter(x => x % 1 == 0),scan((acc, x) => acc + x, 0),)),catchError(err => of('error found')),
).subscribe(console.log);

生命周期:创建流(create、new、创建类操作符)——> 执行流(subscribe) ——> 销毁流(unsubscribe)

操作符分类:

九、Scheduler(调度器)

Scheduler(调度器)控制着何时启动 subscription 和何时发送通知。RxJS 有自己的基准时钟和一套的执行规则,来安排多个任务/数据该如何执行。

种类描述
null不传递或 null 或 undefined,表示同步执行
queue使用队列的方式执行
asap全称:as soon as possible ,表示尽快执行
async使用 setInterval 的调度。

 使用 subscribeOn 来调度 subscribe() 调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe() 调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler),其中 scheduler 是你提供的参数。我们来看一个同步变异步执行的例子:

var observable = Rx.Observable.create(function (observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete();
})
.observeOn(Rx.Scheduler.async);console.log('just before subscribe');
observable.subscribe({next: x => console.log('got value ' + x),error: err => console.error('something wrong occurred: ' + err),complete: () => console.log('done'),
});
console.log('just after subscribe');//结果 输出
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

使用原则/策略

RxJS Scheduler 的原则是:尽量减少并发运行。

  1. 对于返回有限和少量消息的 observable 的操作符,RxJS 不使用调度器,即 null 或 undefined 
  2. 对于返回潜在大量的或无限数量的消息的操作符,使用 queue 调度器。
  3. 对于使用定时器的操作符,使用 aysnc 调度器。

支持调度器的操作符


of 、 from 、 timer 、 interval 、 concat 、 merge 、 combineLatest ,

bufferTime 、 debounceTime 、 delay 、 auditTime 、 sampleTime 、 throttleTime 、 timeInterval 、 timeout 、 timeoutWith 、 windowTime 这样时间相关的操作符全部接收调度器作为最后的参数,并且默认的操作是在 Scheduler.async 调度器上。

相关文章:

  • openssl的x509命令工具
  • Kubernetes入门笔记——(2)k8s设计文档
  • Axure安装及面板各区域详解
  • 【ArcGIS Pro微课1000例】0053:基于SQL Server创建与启用地理数据库
  • 数据结构:栈(Stack)的各种操作(入栈,出栈,判断栈非空,判断栈已满,附源码)
  • 第十六届山东省职业院校技能大赛高职组“应用软件系统开发”赛项样题
  • 【EI会议征稿】第三届电气、电力与电网系统国际会议(ICEPGS 2024)
  • DAP数据集成与算法模型如何结合使用
  • JAVA基础知识:泛型
  • python 爬虫 m3u8 视频文件 加密解密 整合mp4
  • adb命令学习记录
  • 软件崩溃时Visual Studio中看不到有效的调用堆栈,使用Windbg动态调试去分析定位
  • 大数据Vue项目必备|Window下安装并使用nvm(含卸载node、卸载nvm、全局安装npm)
  • c++ 冒泡排序
  • SpringBoot集成swagger2配置权限认证参数
  • @angular/forms 源码解析之双向绑定
  • [NodeJS] 关于Buffer
  • 5、React组件事件详解
  • JavaScript 基本功--面试宝典
  • laravel with 查询列表限制条数
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • Less 日常用法
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Nginx 通过 Lua + Redis 实现动态封禁 IP
  • OSS Web直传 (文件图片)
  • quasar-framework cnodejs社区
  • react-native 安卓真机环境搭建
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • Unix命令
  • VuePress 静态网站生成
  • vue数据传递--我有特殊的实现技巧
  • 浮现式设计
  • 快速构建spring-cloud+sleuth+rabbit+ zipkin+es+kibana+grafana日志跟踪平台
  • 老板让我十分钟上手nx-admin
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • 学习ES6 变量的解构赋值
  • 用简单代码看卷积组块发展
  • “十年磨一剑”--有赞的HBase平台实践和应用之路 ...
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九) ...
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • #、%和$符号在OGNL表达式中经常出现
  • #微信小程序:微信小程序常见的配置传值
  • #我与Java虚拟机的故事#连载14:挑战高薪面试必看
  • (1)bark-ml
  • (2)空速传感器
  • (6)设计一个TimeMap
  • (AngularJS)Angular 控制器之间通信初探
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (windows2012共享文件夹和防火墙设置
  • (附源码)springboot教学评价 毕业设计 641310
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (机器学习-深度学习快速入门)第一章第一节:Python环境和数据分析
  • (论文阅读笔记)Network planning with deep reinforcement learning
  • (小白学Java)Java简介和基本配置