当前位置: 首页 > news >正文

RxJava -- fromArray 和 Just 以及 interval

RxJava 操作符 From Just Interval

为什么会有这个

RxJava框架现在出现已经有些年头了,如果有人问你你会不会用,可能大多数人都会说会。但是我被人问过一个我没有考虑过的问题,你知道Rxjava是怎么实现的吗?我。。。。。
所以就有了这一篇文章。


如果你想通过这篇文章学会Rxjava怎么用,这可能不会是一篇很好的文章,这里面有很多干扰你阅读的东西,和一些我现在还不懂的知识点。但是如果你想通过这篇文章找到我,然后对我说你这样理解不对,这将是一篇完美的文章,因为而且你还会得到一个陌生人的崇拜。

操作符 fromArray

String[] array = {"a","b","c"};
Observable.fromArray(array)
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                OutputUtil.printLn("绑定");
            }

            @Override
            public void onNext(String s) {
                OutputUtil.printLn(s);
            }

            @Override
            public void onError(Throwable e) {
                OutputUtil.printLn("错误");
            }

            @Override
            public void onComplete() {
                OutputUtil.printLn("完成");
            }
        });
复制代码

运行结果

绑定
a
b
c
完成
复制代码

在开始之前我们先定义几个名字

  • Observable (可被观察者)
  • Emitter (发射器)
  • Observer (观察者)

我们在上一篇文章中提到了 如果自己创建 可被观察者 同时通过自己调用 发射器 来进行数据操作的情况,同时RxJava框架为我们准备了多个定义好的操作符。这里我们来讨论一下 fromArray 操作符的内部实现

通过 fromArray 进入到 源码部分

public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
复制代码

还记得之前 如果是通过 create 方法创建返回的是 RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,这里还是在创建一个 可被观察者,同时将需要发射的数据传入

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
复制代码

基本逻辑与create方法相通,不同的是如果是通过 create 方法,我们这里传入的是一个 发射器 的实现,因为我们需要自己来操控 发射器 .

回到我们的测试代码,进入下一步操作是 subscribe(new Observer<String>() {....} 操作,这里传入的观察者的内容,进入这个方法时 我们看到了似曾相识的代码

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // hook 方法
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            // 这里要开始绑定了
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

可被观察者 和 观察者 在绑定的时候,使用的是同样的方法,关键的方法还是绑定的过程 subscribeActual(observer); 因为我们创建的 可被观察者 是 ObservableFromArray 它继承自 Observable 所以在绑定的时候,执行的代码是 ObservableFromArray 内部的方法

final T[] array;
@Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        // 将实现的 绑定传回,这样就能在 观察者 中控制发射
        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }
        // 开始发射
        d.run();
    }
复制代码

这个 可被观察者 在初始化的时候已经传入了,我们添加的 array .
其中创建的

static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        // 传入 观察者 这样就能向 观察者 传递数据
        final Observer<? super T> actual;
        // 数据集合
        final T[] array;

        int index;

        boolean fusionMode;

        volatile boolean disposed;

        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void run() {
            T[] a = array;
            int n = a.length;
            // run 代码 是循环从 array 中提取数据,当结束时调用结束
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
复制代码

其实到这里,基本上已经完成了.但是你可能会发现这里面并没有对 onComplete() 和 onError() 方法进行再处理,也没有很复杂的 disposed 操作. 其实如果你我们如果是这样实现简单逻辑的话,到这里已经是结束了,代码已经运行完成.如果解除绑定,发射器还是会停止发射数据.
但如果你试一下的话,在 onComplete() 之后在调用 onNext() 观察者还是会接收到数据的.

操作符 just

我们在使用 fromArray 操作符的时候,会发现如果我们的 array 长度为 1 ,就会直接转到 just

if (items.length == 1) {
     return just(items[0]);
}
复制代码

进入到 just 方法

public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 创建 just 可被观察者
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
java

```java
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    // 等待发送的数据
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}
复制代码

按照之前的阅读,我们下一步查看的内容就应该是 subscribeActual() 方法,这里创建的是 ScalarDisposable

public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        private static final long serialVersionUID = 3880992722410194083L;

        final Observer<? super T> observer;

        final T value;

        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;

        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }

        @Override
        public boolean offer(T value) {
            throw new UnsupportedOperationException("Should not be called!");
        }

        @Override
        public boolean offer(T v1, T v2) {
            throw new UnsupportedOperationException("Should not be called!");
        }

        @Nullable
        @Override
        public T poll() throws Exception {
            if (get() == FUSED) {
                lazySet(ON_COMPLETE);
                return value;
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return get() != FUSED;
        }

        @Override
        public void clear() {
            lazySet(ON_COMPLETE);
        }

        @Override
        public void dispose() {
            set(ON_COMPLETE);
        }

        @Override
        public boolean isDisposed() {
            return get() == ON_COMPLETE;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                lazySet(FUSED);
                return SYNC;
            }
            return NONE;
        }

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }
复制代码

这里执行的 run() 方法 是想 观察者 发射内容之后
如果简单的去理解,这里的内容还是可以理解的,但是如果我们在分析一下 run()方法内部

static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;

public void run() {
    // 初始状态下 AtomicInteger 类下 get() 拿到的 value 为 0,然后将其切换为 2
    // 这里的 compareAndSet() 方法 牵扯到了线程安全,通过原子操作将值改变
    if (get() == START && compareAndSet(START, ON_NEXT)) {
        observer.onNext(value);
        if (get() == ON_NEXT) {
            // 这里的操作能够实现非堵塞的写入
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}
复制代码

总结

  1. fromArray 和 just 两个 操作符内部是相通的,根据传入的参数的数量不同可以进行相互的转化
  2. 我们之后会提到 RxJava 最厉害的线程之间切换的操作,通过前面的这些,就能够看出 RxJava 为线程的切换做了很多的功过.为了保护线程安全,真是煞费苦心.

##操作符 interval

System.out.println(Thread.currentThread().getId());
Observable.interval(1,1, TimeUnit.SECONDS)
        .take(5)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(Thread.currentThread().getId());
                OutputUtil.printLn("绑定");
            }

            @Override
            public void onNext(Long s) {
                System.out.println(Thread.currentThread().getId());
                OutputUtil.printLn(System.currentTimeMillis()+"");
                OutputUtil.printLn(s+"");
            }

            @Override
            public void onError(Throwable e) {
                OutputUtil.printLn("错误");
            }

            @Override
            public void onComplete() {
                System.out.println(Thread.currentThread().getId());
                OutputUtil.printLn("完成");
            }
        });
复制代码

这个操作符 实现的是类似于计时器的功能,发射器等待一段相应的时间然后再向观察者发送数据。这里有牵扯到了RxJava的线程相关内容,请谨慎的往下看,我可能会有错
结果:

I/System.out: 1
I/System.out: 1
D/内容: 绑定
I/System.out: 5585
D/内容: 1523349727345
      0
I/System.out: 5585
D/内容: 1523349728345
      1
I/System.out: 5585
D/内容: 1523349729345
      2
I/System.out: 5585
D/内容: 1523349730345
      3
I/System.out: 5585
D/内容: 1523349731344
      4
I/System.out: 5585
D/内容: 完成    
复制代码

System.out 输出的是当前程序运行所在的线程,
内容标识的是输出的时间和输出的数值.

从这里我们可以看到,默认条件下可被观察者 的绑定是在我们运行的线程上面,但是等到观察者接收到数据的时候,线程就已经切换了.我们可以认为在绑定的时候,这里创建了新的线程来运行.而且程序的运行是没有阻塞程序运行线程的.

interval()为入口,我们进入到可被观察者的创建过程

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    ObjectHelper.requireNonNull(unit, "unit is null");
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
复制代码

这里我们把 Schedulers 称为 调度器
它的主要任务是实现线程调度,可以等待,可以切换线程,可以指定线程.我们在不传入调度器的情况下,会给一个默认的调度器.
将数据传入之后就产生了一个 可被观察者 的对象,如果是普通条件下我们下一步就要开始订阅了.但是我们现在这个情况如果不做任何处理的话,消息会一直发送,所以我们还是会添加一个 take() 方法,来控制这个输出的上限.不过我们先详细讨论.

到了订阅部分之后,执行的是 ObservableInterval 里的 订阅方法

@Override
    public void subscribeActual(Observer<? super Long> s) {
        // 创建特殊的观察者
        IntervalObserver is = new IntervalObserver(s);
        s.onSubscribe(is);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            // 如果是 TrampolineScheduler 则会在订阅线程上面执行 观察者 代码
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            // 在指定的 线程 上面执行观察者代码
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }
复制代码

我们先不讨论调度器为 TrampolineScheduler 的情况 相较于线程调度,这个要简单的多.

下面我们讨论 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); 这句话, sch 表示的是 我们添加的 调度器 ,因为我们前面没有设置 调度器,所以 默认为 Schedulers.computation() 所以 这里调用的 就会是 ComputationScheduler 中的 schedulePeriodicallyDirect() 方法!!! 并不是 Scheduler 里面的 别问我怎么知道的.

public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
        PoolWorker w = pool.get().getEventLoop();
        return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
    }
复制代码

然后 poolWorker 内部逻辑极其复杂,之后会在讨论.

进入到 w.schedulePeriodicallyDirect(run, initialDelay, period, unit); 方法内部

public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
        // hook 相关
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        if (period <= 0L) {

            // 间隔 为 0 的情况不讨论
            
        }
        // 这个 decorateRun 还记得吗? 这个是我们 IntervalObserver 啊,他调用了 观察者中的 onNext()
        // 然后这里来调用它, 社会社会
        ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
        try {
            // 通过 ExecutorService 来控制 run() 执行的次数和间隔时间
            Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
            // 设定未来
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }
复制代码
public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable {

    private static final long serialVersionUID = 1811839108042568751L;

    public ScheduledDirectPeriodicTask(Runnable runnable) {
        super(runnable);
    }

    @Override
    public void run() {
        // 线程还进行了切换,切换到了当前线程,这个当前线程可不是什么 订阅线程 而是 调度器的线程
        runner = Thread.currentThread();
        try {
            // 看! 是这里
            runnable.run();
            runner = null;
        } catch (Throwable ex) {
            runner = null;
            lazySet(FINISHED);
            RxJavaPlugins.onError(ex);
        }
    }
}
复制代码

总结

这一段内容比想象中还要复杂,相信你看完之后肯定是一脸懵逼。也没有办法,我是一边看一边写,有很多东西没有一个全局的观念,同时水平有限,不能够体会到作者其中的用意。如果你有什么问题,或者我写的有什么错误,请及时的告诉我。

  1. 在 interval 中 会给一个默认的 Scheduler 调度器,通过调度器来表示这个 观察者的内容应该在哪个线程上面被处理.
  2. 使用 Work 来标识任务,使用 ExecutorService 来进行任务的计时 和 重复操作
  3. 代码很复杂,功能很强大.会用就好,以后有机会了再细致研究

转载于:https://juejin.im/post/5accd317f265da2375073b33

相关文章:

  • ifup em2启动网卡时报错:connection activation failed
  • BOOST.ASIO源码剖析(一)
  • iis web.config 配置示例
  • 不要仅为85%的用户设计:关注无障碍设计
  • 猫头鹰的深夜翻译:Java 2D Graphics, 简单的仿射变换
  • Oracle安装时,已有oracle用户,将用户添加到oinstall和dba用户组
  • freenom域名解析与次级域名
  • 面试题:给你个id,去拿到name,多叉树遍历
  • 前端CORS请求梳理
  • osquery简单试用
  • 关于Java中分层中遇到的一些问题
  • 156:Ananagrams
  • 区块链技术
  • 浅度理解NodeJS的HTTP模块
  • Git的本地仓库与GitHub的远程仓库
  • Android组件 - 收藏集 - 掘金
  • JS函数式编程 数组部分风格 ES6版
  • leetcode-27. Remove Element
  • Mocha测试初探
  • tweak 支持第三方库
  • Vim 折腾记
  • vue2.0项目引入element-ui
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 关键词挖掘技术哪家强(一)基于node.js技术开发一个关键字查询工具
  • 通过npm或yarn自动生成vue组件
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #{} 和 ${}区别
  • (23)Linux的软硬连接
  • (poj1.3.2)1791(构造法模拟)
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (ros//EnvironmentVariables)ros环境变量
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (免费分享)基于springboot,vue疗养中心管理系统
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (学习日记)2024.02.29:UCOSIII第二节
  • (转)德国人的记事本
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .NET Core 中的路径问题
  • .net Signalr 使用笔记
  • .Net调用Java编写的WebServices返回值为Null的解决方法(SoapUI工具测试有返回值)
  • .net下简单快捷的数值高低位切换
  • @ConfigurationProperties注解对数据的自动封装
  • @FeignClient注解,fallback和fallbackFactory
  • @ModelAttribute使用详解
  • @SpringBootApplication 包含的三个注解及其含义
  • [acwing周赛复盘] 第 94 场周赛20230311
  • [autojs]autojs开关按钮的简单使用
  • [BJDCTF 2020]easy_md5
  • [BUG] Hadoop-3.3.4集群yarn管理页面子队列不显示任务
  • [BZOJ] 3262: 陌上花开
  • [C puzzle book] types
  • [ERROR] Plugin 'InnoDB' init function returned error
  • [HTML]Web前端开发技术29(HTML5、CSS3、JavaScript )JavaScript基础——喵喵画网页
  • [leetcode] Balanced Binary Tree