Rxjs 学习笔记 - 简化版
RxJS 提供了一套完整的异步解決方案,让我们在面对各种异步行为(Event, AJAX, Animation 等)都可以使用相同的 API 做开发。
前置知识点
- 同步(Synchronous)就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。是一种线性执行的方式,执行的流程不能跨越。一般用于流程性比较强的程序,比如用户登录,需要对用户验证完成后才能登录系统。
- 异步(Asynchronous)则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。是一种并行处理的方式,不必等待一个程序执行完,可以执行其它的任务,比如页面数据加载过程,不需要等所有数据获取后再显示页面。
- 观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Obesver)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。观察者模式主题和观察者是分离的,不是主动触发而是被动监听。
- 迭代器模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。迭代器模式可以把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。
- 响应式编程,即 Reactive Programming,它是一种基于事件模式的模型。简单来说,上一个任务执行结果的反馈就是一个事件,这个事件的到来会触发下一个任务的执行。
- Stream / 流的本质是一个按时间顺序排列的进行中事件的序列集合。
一、RxJS 基本介绍
RxJS 全称 Reactive Extensions for JavaScript,翻译过来是 Javascript 的响应式扩展,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。
简单来说 Rx(JS) = Observables (可观察对象)+ Operator (操作符)+ Scheduler(调度器)。
二、RxJS 的核心概念:
- Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
- Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像map、filter、concat、flatMap等这样的操作符来处理集合。
- Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout 或 requestAnimationFrame或其他。
三、 理解 RxJS 中的 Observer 和 Observable
将买房作为例子套用到观察者模式中:
- 房价即为 Observable 对象;
- 购房者即为 Observer 对象;
- 而购房者观察房价即为 Subscribe(订阅)关系;
结合买房的例子,描述 Observable 和 Observer 的行为:
- Observable ****可被观察,即房价被购房者关注,并且随时间变化发出 (emit) 不同值,表现为房价波动;
- Observer 可以观察 Observable,即购房者关注房价,并在 Observable (房价)发出不同值(房价波动)时做出响应,即买房或观望;
- Observable 和 Observer 之间通过订阅(Subscription)建立观察关系;
- 当 Observable 没有 Observer 的时候,即使发出了值,也不会产生任何影响,即无人关注房价时,房价不管如何波动都不会有响应;
四、Observable (可观察对象)
RxJS 的基础就是 可观察对象(Observable),只要弄懂 Observable 就算是学会一半的 RxJS ,剩下的就只是一些方法的练习和熟悉;
const observable = Observable.create(observer => {//调用 Observable.create 方法来创建一个 Observable.入参是 observer(观察者)observer.next('foo');//函数内部通过调用 observer.next() 便可生成有一系列值的一个 Observableobserver.next('bar');
})/*但是运行这段代码后并不会发生任何事情.
我们需要一个 observer 去订阅这个 observable,此后基于 observable 发出的值,observer 才会响应。
因此,在如上代码末尾,我们再加上一行:*/observable.subscribe(console.log);//运行代码,console.log 函数便会依次打印出 'foo' 和 'bar' 了。
Observeable 是观察者模式中的被观察者,它维护一段执行函数,提供了惰性执行的能力(subscribe)。
当一个 Observable 被建立后,它能被多个 observer 订阅,每个订阅关系相互独立、互不影响。
Observable 与 Promise关键性的不同点:
Observable | Promise | |
---|---|---|
使用场景 | 同步、异步均可使用 | 用 Promise 包裹的多数是异步场景 |
执行时机 | 声明式惰性执行,只有在订阅后才会执行 | 创建时就立即执行 |
执行次数 | 多次调用 subscribe 函数会执行多次 | 只有第一次执行,后续都是取值 |
流程控制 | 相较于 Promise 有更为全面的操作符 | 提供串行、并行的函数 |
错误处理 | subscribe 函数捕获错误 | .catch 捕获 |
总的来说,Promise 可读性更优,Observable 从使用场景更为全面。
五、Observer(观察者)
Subscriber/Observer 是观察者模式中的观察者/消费者,它用来消费/执行 Observable 创建的函数。
每当 可观察对象(Observable )发生事件时,便会触发观察者(Observer)相对应的方法。
它的三个方法:
- next (传值):每当 Observable 输出新值,next 方法就会被触发。
- error (错误处理):每当Observable 內发生错误时,error 方法就会被触发。
- complete (完成/终止):在 Observable 沒有其它数据可以获取时,complete 方法就会被触发,在 complete 被触发之后,next 方法就不会再起作用。
工作流程:
六、Subscription
订阅(subscribe) 可观察对象(Observable ),会返回一个 subscription 对象,对象具有 停止订阅 且 释放 资源 的 unsubscribe
方法。
方法:
unsubcribe
(取消订阅)add
(分组或在取消订阅之前插入一段逻辑)
注意:调用unsubcribe
后(包含add
传入的其它 Subscription)不会再接收到它们的数据。
var source = Rx.Observable.timer(1000, 1000);//通过timer 创建 Observeable 实例 sourcevar subscription = source.subscribe({//调用 subscribe ,获得订阅对象subscriptionnext: function(value) {console.log(value)},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error)}});setTimeout(() => {subscription.unsubscribe() // 停止订阅 (退订)
}, 5000);// 0
// 1
// 2
// 3
// 4
七、主体(Subject)
RxJS 中 Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。
对于 Subject,你提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。
从观察者的角度而言,无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。在这里,subscribe 类似于 JavaScript 中的 addEventListener;与此同时,每个 Subject 又都是观察者。Subject 是一个有如下方法的对象: next(v)
、 error(e)
和 complete()
。要给 Subject 提供新值,只要调用 next(theValue)
方法。就是说 Subject既可以是被观察者/生产者也可以是观察者/消费者。
优点
- 减少开销和提高性能
- 数据实时推送
与 Observable 的区别
Observable | Subject | |
---|---|---|
角色 | 生产者(单向) | 生产者、消费者(双向) |
消费策略 | 单播 | 多播 |
流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
数据特性 | 冷数据流 | 热数据流 |
消费时机 | 调用 subscribe | 调用 next |
冷数据流:可以订阅任意时间的数据流。
热数据流:只给已订阅的消费者发送消息,定阅之前的消费者,不会收到消息。
其他 Subject
种类 | 作用 |
---|---|
BehaviorSubject | 回放数据,如果是订阅前推送的数据,只回放最新的值 |
ReplaySubject | 回放数据,初始化设定要缓存多少次的值,然后将这批消息推送 |
AsyncSubject | 只有调用 complete 后才会推送数据 |
八、Operator (操作符)
Operator 本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出,前面的 Observable 保持不变。
RxJS 6 及更新版本提供了可链式调用(Pipeable)的 RxJS 操作符,假设 source 是一个已定义的 observable,一个简单示例如下:
source.pipe(map(x => x + x),mergeMap(n => of(n + 1, n + 2).pipe(filter(x => x % 1 == 0),scan((acc, x) => acc + x, 0),)),catchError(err => of('error found')),
).subscribe(console.log);
生命周期:创建流(create、new、创建类操作符)——> 执行流(subscribe) ——> 销毁流(unsubscribe)
操作符分类:
九、Scheduler(调度器)
Scheduler(调度器)控制着何时启动 subscription 和何时发送通知。RxJS 有自己的基准时钟和一套的执行规则,来安排多个任务/数据该如何执行。
种类 | 描述 |
---|---|
null | 不传递或 null 或 undefined,表示同步执行 |
queue | 使用队列的方式执行 |
asap | 全称:as soon as possible ,表示尽快执行 |
async | 使用 setInterval 的调度。 |
使用 subscribeOn
来调度 subscribe()
调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe()
调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler)
,其中 scheduler 是你提供的参数。我们来看一个同步变异步执行的例子:
var observable = Rx.Observable.create(function (observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete();
})
.observeOn(Rx.Scheduler.async);console.log('just before subscribe');
observable.subscribe({next: x => console.log('got value ' + x),error: err => console.error('something wrong occurred: ' + err),complete: () => console.log('done'),
});
console.log('just after subscribe');//结果 输出
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
使用原则/策略
RxJS Scheduler 的原则是:尽量减少并发运行。
- 对于返回有限和少量消息的 observable 的操作符,RxJS 不使用调度器,即
null
或undefined
。 - 对于返回潜在大量的或无限数量的消息的操作符,使用
queue
调度器。 - 对于使用定时器的操作符,使用
aysnc
调度器。
支持调度器的操作符
of 、 from 、 timer 、 interval 、 concat 、 merge 、 combineLatest ,
bufferTime 、 debounceTime 、 delay 、 auditTime 、 sampleTime 、 throttleTime 、 timeInterval 、 timeout 、 timeoutWith 、 windowTime 这样时间相关的操作符全部接收调度器作为最后的参数,并且默认的操作是在 Scheduler.async 调度器上。