当前位置: 首页 > news >正文

RxJava操作符之Share, Publish, Refcount

看源码知道.share()操作符是.publish().refcount()调用链的包装。

先来看ConnectedObservable

“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.
It only starts emitting items after its .connect() method is called.

因为这个原因,在ConnectedObservable的connect这个方法被调用之前,connected obesrvable也被认为是“冷”和不活跃。

再看publish方法

.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.
Simply call this method on an ordinary observable and it becomes a connected one.

现在我们知道了share操作符的1/2,那么为什么需要运用Connected Observable这个操作符呢?文档上是这么写的:

In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

这就意味着publish可以调用多个subscriber。当你有超过一个订阅者的时候,处理每个订阅和正确的销毁他们变得棘手。 为了使这个更方便,Rx发明了这个魔法操作符refcount():

refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable and
refrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.

refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

我们再次看一下debouncedBuffer的例子,看一下在哪,share是怎么用的。

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
// which is really the same as:
Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();

我们现在有了一个"shareable"的名字叫"tapEventEmitter"的observable。 因为他是可以分享的,而且还不是“live”(share操作符中的publish调用使其变成一个ConnectedObservable), 我们可以用他构成我们的Observables,而且要确保我们有了一个原始的observable的引用 (这个例子中原始的observable是_rxBus.toObserverable()).

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
tapEventEmitter.buffer(debouncedEventEmitter)
//...

所有的这一切看起来都很好。然而这个实现会有一个可能的竞争条件。因为这有两个subscribers(debounce and buffer)而且会在不同的时间点发生,所以竞争条件就会发生。 记住RxBus是由hot/live Subject支持的不断的发射数据。通过使用share操作符,我们保证引用的是同一个资源。 而不是subscribers在不同的时间点订阅,他们会收到准确的相同的数据。

The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.
The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.

To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.
Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.

在我们的用法中几乎立即执行所以没有什么关系。但是我们原始的意图是把debouncedBuffer方法作为一个单独的操作符。 如果相同的事件没有被发射出去,从概念上看起来是不正确的。

通过Bean后来的建议,我添加了一个更好的第三方的实现,用来处理这种竞争条件。

复制代码
// don't start emitting items just yet by turning the observable to a connected one
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();

tapEventEmitter.publish((Func1) (stream) -> {

// inside `publish`, "stream" is truly multicasted // applying the same technique for getting a debounced buffer sequence return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); }).subscribe((Action1) (taps) { _showTapCount(taps.size()); }); // start listening to events now tapEventEmitter.connect();
复制代码

相关文章:

  • node.js学习之流解析(一)
  • SpringBoot之@EnableAutoConfiguration原理及自定义扩展
  • CentOS 6.5 安全加固
  • python之路——常用模块
  • 排序算法之选择排序
  • PostgreSQL入门及提权
  • 面向对象1
  • Lambda表达式(Java)
  • 区块链将会怎样颠覆Google、Amazon、Facebook和Apple?
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • Windows Server 2012的服务管理自动化 -启动类型设置,手动启动还是自动启动
  • JVM 组成以及各部分作用
  • PHP 500报错的快速解决方法
  • windows网络模型之完成端口(CompletionPort)详解 (转)
  • [转]区块链代码快速学习实践
  • __proto__ 和 prototype的关系
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • angular2 简述
  • javascript数组去重/查找/插入/删除
  • Terraform入门 - 1. 安装Terraform
  • VuePress 静态网站生成
  • 反思总结然后整装待发
  • 飞驰在Mesos的涡轮引擎上
  • 浅谈web中前端模板引擎的使用
  • 嵌入式文件系统
  • 少走弯路,给Java 1~5 年程序员的建议
  • - 转 Ext2.0 form使用实例
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • 06-01 点餐小程序前台界面搭建
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • $().each和$.each的区别
  • (day6) 319. 灯泡开关
  • (function(){})()的分步解析
  • (MIT博士)林达华老师-概率模型与计算机视觉”
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)基于SpringBoot和Vue的厨到家服务平台的设计与实现 毕业设计 063133
  • (十)c52学习之旅-定时器实验
  • (原創) 如何動態建立二維陣列(多維陣列)? (.NET) (C#)
  • (转) 深度模型优化性能 调参
  • ***测试-HTTP方法
  • *p++,*(p++),*++p,(*p)++区别?
  • .net core Swagger 过滤部分Api
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET delegate 委托 、 Event 事件
  • .Net Winform开发笔记(一)
  • .net程序集学习心得
  • .Net调用Java编写的WebServices返回值为Null的解决方法(SoapUI工具测试有返回值)
  • .skip() 和 .only() 的使用
  • /bin/rm: 参数列表过长"的解决办法
  • @JSONField或@JsonProperty注解使用
  • [100天算法】-每个元音包含偶数次的最长子字符串(day 53)
  • [AIGC] Nacos:一个简单 yet powerful 的配置中心和服务注册中心
  • [BZOJ] 1001: [BeiJing2006]狼抓兔子
  • [CISCN2021 Quals]upload(PNG-IDAT块嵌入马)
  • [G-CS-MR.PS02] 機巧之形2: Ruler Circle