RxJava 操作符 From Just Interval
为什么会有这个
RxJava框架现在出现已经有些年头了,如果有人问你你会不会用,可能大多数人都会说会。但是我被人问过一个我没有考虑过的问题,你知道Rxjava是怎么实现的吗?我。。。。。
所以就有了这一篇文章。
如果你想通过这篇文章学会Rxjava怎么用,这可能不会是一篇很好的文章,这里面有很多干扰你阅读的东西,和一些我现在还不懂的知识点。但是如果你想通过这篇文章找到我,然后对我说你这样理解不对,这将是一篇完美的文章,因为而且你还会得到一个陌生人的崇拜。
操作符 fromArray
String[] array = {"a","b","c"};
Observable.fromArray(array)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
OutputUtil.printLn("绑定");
}
@Override
public void onNext(String s) {
OutputUtil.printLn(s);
}
@Override
public void onError(Throwable e) {
OutputUtil.printLn("错误");
}
@Override
public void onComplete() {
OutputUtil.printLn("完成");
}
});
复制代码
运行结果
绑定
a
b
c
完成
复制代码
在开始之前我们先定义几个名字
- Observable (可被观察者)
- Emitter (发射器)
- Observer (观察者)
我们在上一篇文章中提到了 如果自己创建 可被观察者 同时通过自己调用 发射器 来进行数据操作的情况,同时RxJava框架为我们准备了多个定义好的操作符。这里我们来讨论一下 fromArray 操作符的内部实现
通过 fromArray 进入到 源码部分
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
复制代码
还记得之前 如果是通过 create 方法创建返回的是 RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
,这里还是在创建一个 可被观察者,同时将需要发射的数据传入
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
复制代码
基本逻辑与create方法相通,不同的是如果是通过 create 方法,我们这里传入的是一个 发射器 的实现,因为我们需要自己来操控 发射器 .
回到我们的测试代码,进入下一步操作是 subscribe(new Observer<String>() {....}
操作,这里传入的观察者的内容,进入这个方法时 我们看到了似曾相识的代码
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// hook 方法
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 这里要开始绑定了
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
可被观察者 和 观察者 在绑定的时候,使用的是同样的方法,关键的方法还是绑定的过程 subscribeActual(observer);
因为我们创建的 可被观察者 是 ObservableFromArray
它继承自 Observable 所以在绑定的时候,执行的代码是 ObservableFromArray
内部的方法
final T[] array;
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
// 将实现的 绑定传回,这样就能在 观察者 中控制发射
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
// 开始发射
d.run();
}
复制代码
这个 可被观察者 在初始化的时候已经传入了,我们添加的 array .
其中创建的
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
// 传入 观察者 这样就能向 观察者 传递数据
final Observer<? super T> actual;
// 数据集合
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() {
int i = index;
T[] a = array;
if (i != a.length) {
index = i + 1;
return ObjectHelper.requireNonNull(a[i], "The array element is null");
}
return null;
}
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
// run 代码 是循环从 array 中提取数据,当结束时调用结束
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
复制代码
其实到这里,基本上已经完成了.但是你可能会发现这里面并没有对 onComplete() 和 onError() 方法进行再处理,也没有很复杂的 disposed 操作. 其实如果你我们如果是这样实现简单逻辑的话,到这里已经是结束了,代码已经运行完成.如果解除绑定,发射器还是会停止发射数据.
但如果你试一下的话,在 onComplete() 之后在调用 onNext() 观察者还是会接收到数据的.
操作符 just
我们在使用 fromArray 操作符的时候,会发现如果我们的 array 长度为 1 ,就会直接转到 just
if (items.length == 1) {
return just(items[0]);
}
复制代码
进入到 just 方法
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 创建 just 可被观察者
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
java
```java
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
// 等待发送的数据
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
复制代码
按照之前的阅读,我们下一步查看的内容就应该是 subscribeActual() 方法,这里创建的是 ScalarDisposable
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
final Observer<? super T> observer;
final T value;
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called!");
}
@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called!");
}
@Nullable
@Override
public T poll() throws Exception {
if (get() == FUSED) {
lazySet(ON_COMPLETE);
return value;
}
return null;
}
@Override
public boolean isEmpty() {
return get() != FUSED;
}
@Override
public void clear() {
lazySet(ON_COMPLETE);
}
@Override
public void dispose() {
set(ON_COMPLETE);
}
@Override
public boolean isDisposed() {
return get() == ON_COMPLETE;
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
lazySet(FUSED);
return SYNC;
}
return NONE;
}
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
复制代码
这里执行的 run() 方法 是想 观察者 发射内容之后
如果简单的去理解,这里的内容还是可以理解的,但是如果我们在分析一下 run()方法内部
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
public void run() {
// 初始状态下 AtomicInteger 类下 get() 拿到的 value 为 0,然后将其切换为 2
// 这里的 compareAndSet() 方法 牵扯到了线程安全,通过原子操作将值改变
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
// 这里的操作能够实现非堵塞的写入
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
复制代码
总结
- fromArray 和 just 两个 操作符内部是相通的,根据传入的参数的数量不同可以进行相互的转化
- 我们之后会提到 RxJava 最厉害的线程之间切换的操作,通过前面的这些,就能够看出 RxJava 为线程的切换做了很多的功过.为了保护线程安全,真是煞费苦心.
##操作符 interval
System.out.println(Thread.currentThread().getId());
Observable.interval(1,1, TimeUnit.SECONDS)
.take(5)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getId());
OutputUtil.printLn("绑定");
}
@Override
public void onNext(Long s) {
System.out.println(Thread.currentThread().getId());
OutputUtil.printLn(System.currentTimeMillis()+"");
OutputUtil.printLn(s+"");
}
@Override
public void onError(Throwable e) {
OutputUtil.printLn("错误");
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getId());
OutputUtil.printLn("完成");
}
});
复制代码
这个操作符 实现的是类似于计时器的功能,发射器等待一段相应的时间然后再向观察者发送数据。这里有牵扯到了RxJava的线程相关内容,请谨慎的往下看,我可能会有错
结果:
I/System.out: 1
I/System.out: 1
D/内容: 绑定
I/System.out: 5585
D/内容: 1523349727345
0
I/System.out: 5585
D/内容: 1523349728345
1
I/System.out: 5585
D/内容: 1523349729345
2
I/System.out: 5585
D/内容: 1523349730345
3
I/System.out: 5585
D/内容: 1523349731344
4
I/System.out: 5585
D/内容: 完成
复制代码
System.out 输出的是当前程序运行所在的线程,
内容标识的是输出的时间和输出的数值.
从这里我们可以看到,默认条件下可被观察者
的绑定是在我们运行的线程上面,但是等到观察者
接收到数据的时候,线程就已经切换了.我们可以认为在绑定的时候,这里创建了新的线程来运行.而且程序的运行是没有阻塞程序运行线程的.
以interval()
为入口,我们进入到可被观察者的创建过程
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
return interval(initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
复制代码
这里我们把 Schedulers 称为 调度器
它的主要任务是实现线程调度,可以等待,可以切换线程,可以指定线程.我们在不传入调度器的情况下,会给一个默认的调度器.
将数据传入之后就产生了一个 可被观察者 的对象,如果是普通条件下我们下一步就要开始订阅了.但是我们现在这个情况如果不做任何处理的话,消息会一直发送,所以我们还是会添加一个 take() 方法,来控制这个输出的上限.不过我们先详细讨论.
到了订阅部分之后,执行的是 ObservableInterval 里的 订阅方法
@Override
public void subscribeActual(Observer<? super Long> s) {
// 创建特殊的观察者
IntervalObserver is = new IntervalObserver(s);
s.onSubscribe(is);
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
// 如果是 TrampolineScheduler 则会在订阅线程上面执行 观察者 代码
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
// 在指定的 线程 上面执行观察者代码
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
复制代码
我们先不讨论调度器为 TrampolineScheduler 的情况 相较于线程调度,这个要简单的多.
下面我们讨论 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
这句话, sch
表示的是 我们添加的 调度器 ,因为我们前面没有设置 调度器,所以 默认为 Schedulers.computation()
所以 这里调用的 就会是 ComputationScheduler
中的 schedulePeriodicallyDirect() 方法!!! 并不是 Scheduler 里面的 别问我怎么知道的.
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
复制代码
然后 poolWorker 内部逻辑极其复杂,之后会在讨论.
进入到 w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
方法内部
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
// hook 相关
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {
// 间隔 为 0 的情况不讨论
}
// 这个 decorateRun 还记得吗? 这个是我们 IntervalObserver 啊,他调用了 观察者中的 onNext()
// 然后这里来调用它, 社会社会
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
// 通过 ExecutorService 来控制 run() 执行的次数和间隔时间
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
// 设定未来
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
复制代码
public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable {
private static final long serialVersionUID = 1811839108042568751L;
public ScheduledDirectPeriodicTask(Runnable runnable) {
super(runnable);
}
@Override
public void run() {
// 线程还进行了切换,切换到了当前线程,这个当前线程可不是什么 订阅线程 而是 调度器的线程
runner = Thread.currentThread();
try {
// 看! 是这里
runnable.run();
runner = null;
} catch (Throwable ex) {
runner = null;
lazySet(FINISHED);
RxJavaPlugins.onError(ex);
}
}
}
复制代码
总结
这一段内容比想象中还要复杂,相信你看完之后肯定是一脸懵逼。也没有办法,我是一边看一边写,有很多东西没有一个全局的观念,同时水平有限,不能够体会到作者其中的用意。如果你有什么问题,或者我写的有什么错误,请及时的告诉我。
- 在 interval 中 会给一个默认的 Scheduler 调度器,通过调度器来表示这个 观察者的内容应该在哪个线程上面被处理.
- 使用 Work 来标识任务,使用 ExecutorService 来进行任务的计时 和 重复操作
- 代码很复杂,功能很强大.会用就好,以后有机会了再细致研究