24

两张图彻底理解 RxJava2 的核心原理

 4 years ago
source link: http://solart.cc/2020/06/16/understand_rxjava2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

作者: solart

版权声明:本文图文为博主原创,转载请注明出处。

文章似乎有些标题党的嫌疑,根据我的理解画出两幅图相信可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手:smile:!相信 RxJava 是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个,但是也是理解后最清晰的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。

操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。

这篇文章我会讲些什么

  • RxJava2 基本的运行流程
  • RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
  • 为什么一订阅就回调了 onSubscribe
  • 为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效

以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。

1、简单的链式调用(无线程切换)

先来看一段示例代码:

Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                Log.d("solart", "subscribe > ${Thread.currentThread().name}")
                emitter.onNext("test")
                emitter.onComplete()
            }
        }).flatMap(object : Function<String, Observable<String>> {
            override fun apply(t: String): Observable<String> {
                return Observable.just(t)
            }
        }).map(object : Function<String, Int> {
            override fun apply(t: String): Int {
                return 0
            }
        }).subscribe(object : Observer<Int> {
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }
        })

这段代码中我们简单用了 createflatMapmap 等操作符,进行了流式的数据转换,最后我们通过 subscribe 订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图

qyYJnuz.png!web

1.1 数据源的包裹

比照着这张图,我们来看一下,首先 蓝色虚线 部分是我们代码中实际调用的顺序,查看 Observable.create 我们不难发现,此处就是产生了一个 ObservableCreate 实例,

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
       ObjectHelper.requireNonNull(source, "source is null");
       return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

如我们图中所示, ObservableCreate 内部包含一个类型为 ObservableOnSubscribe<T>source 变量,根据我们代码中的调用,这个 source 就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe<String>

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ...
    }
    ...
}

我们顺着代码的调用顺序,继续看一下 flatMap 的方法中又做了什么:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

类似的产生了一个 ObservableFlatMap 实例,而其内部持有一个类型为 ObservableSource<T>source 变量,而该 source 则是上一步中的 ObservableCreate 实例,依次我们看 map 依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容。

1.2 逆向订阅数据源

我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe 时(图中红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            // 重点!!! 发生订阅的核心方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
            throw npe;
        }
    }

根据我们上面的分析,执行 subscribeActual 的对象其实是 ObservableMap ,我们来看它的 subscribeActual 的实现

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    ...

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    ...
}

注意,此时产生了一个 MapObserver 对象, MapObserver 中通过 actual 持有了我们自己的匿名对象 object : Observer<Int> ,同样的, ObservableMap 执行 subscribeActual 又调用了上层的 source.subscribe ,依次逆向调用,就得到了我们图中上半部分的红线内容。

1.3 触发数据源产生原始数据,数据流转

当订阅发生在最顶层时,也就是 ObservableCreate 中的 subscribeActual ,此时触发了数据源的产生,通过 emitter 发射数据

public final class ObservableCreate<T> extends Observable<T> {
    ...
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent); //此时触发了 onSubscribe 回调,这里先提一下

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...
}

而我们代码中此时产生了真正的数据

override fun subscribe(emitter: ObservableEmitter<String>) {
                Log.d("solart", "subscribe > ${Thread.currentThread().name}")
                emitter.onNext("test")
                emitter.onComplete()
}

此时我们再来看 CreateEmitter 的实现:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                observer.onNext(t);  //向下层分发数据
            }
        }
        ...
  }

根据我们上面的分析 CreateEmitter 中持有的 observer 即是 FlatMapObserver 的实例,而 FlatMapObserver 调用 onNext 时,又会调用 MapObserver 的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。

观察我们这个图,你会发现, 操作符 对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符 ,例如 ObservableMap = Observable + map,观察者命名大多遵循 操作符 + Observer ,例如 FlatMapObserver = flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。

2、异步事件流编程(线程切换)

相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。

同样的我们还是先给出一个简单的示例:

Observable.create(object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    Log.d("solart", "subscribe >  ${Thread.currentThread().name}")
                    emitter.onNext("test")
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.io())
                .map(object : Function<String, Int> {
                override fun apply(t: String): Int {
                    return 0
                }
            }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer<Int> {
               
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }

            })

这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:

ANfi2ya.png!web

图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。

2.1 逆向订阅时触发 subscribeOn 的线程切换

根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。

我们来看订阅时(图中⑦的流程)切换线程的 ObservableSubscribeOn 的代码:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ...
}

在逆向订阅的流程中,通过指定 SchedulerSubscribeTask 任务交给线程池处理,我们先来看一下 SubscribeTask 的代码,就是执行了订阅:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent); // 仅仅订阅了一下
        }
    }

我们再来看 scheduler.scheduleDirect() 中是如何做到线程切换的:

public abstract class Scheduler {
    ...
    @NonNull
    public abstract Worker createWorker(); // 实现类中实现
    ...
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker(); // 创建一个 worker

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit); //执行任务

        return task;
    }
    ...
}

我们示例中是切换到了 io 线程,所以我们对应的看一下 IoScheduler 的部分代码:

public final class IoScheduler extends Scheduler {
    ...
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
   ...
   static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        ...

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    static final class ThreadWorker extends NewThreadWorker {
        ...
        // 此处粘贴了了父类中的实现
        @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

            ...

            Future<?> f;
            try {
                // 线程池执行任务
                if (delayTime <= 0) {
                     f = executor.submit((Callable<Object>)sr);
                } else {
                     f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                ...
            }
            
            return sr;
        }
    }
}

综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn 本身就是在 subscribeActual 中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生。这里还有一点要提一下, ObservableSubscribeOn 在执行 subscribeActual 时,回调了下层产生的 ObserveronSubscribe ,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会收到 onSubscribe 的回调,且在当前订阅时的线程中。

2.2 正向数据流触发 observerOn 的线程切换

同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver 的 onNext 处理的逻辑,也就是图中步骤⑬⑭:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    ...
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        ...
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        ...
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule(); //类似 ObservableSubscribeOn.subscribeActual() 异步线程执行
        }
        ...
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        ...
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        ...
    }
}

示例中我们此时切换到了 Main 线程中执行,我们来看对应的 HandlerScheduler 实现:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;
    ...
    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
    
    private static final class HandlerWorker extends Worker {
        ...
        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            ...

            return scheduled;
        }
    }

从代码中我们可以看到,此时将 Runnable 通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了上面我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。

总结

至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码流转,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现 异步事件流编程 ,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。

另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK