两张图彻底理解 RxJava2 的核心原理
source link: http://solart.cc/2020/06/16/understand_rxjava2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
作者: solart
版权声明:本文图文为博主原创,转载请注明出处。
文章似乎有些标题党的嫌疑,根据我的理解画出两幅图相信可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手:smile:!相信 RxJava
是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个,但是也是理解后最清晰的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。
操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。
这篇文章我会讲些什么
- RxJava2 基本的运行流程
- RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
- 为什么一订阅就回调了 onSubscribe
- 为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效
以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。
1、简单的链式调用(无线程切换)
先来看一段示例代码:
Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() } }).flatMap(object : Function<String, Observable<String>> { override fun apply(t: String): Observable<String> { return Observable.just(t) } }).map(object : Function<String, Int> { override fun apply(t: String): Int { return 0 } }).subscribe(object : Observer<Int> { override fun onSubscribe(d: Disposable) { Log.d("solart", "onSubscribe > ${Thread.currentThread().name}") } override fun onNext(t: Int) { Log.d("solart", "onNext > ${Thread.currentThread().name}") } override fun onComplete() { Log.d("solart", "onComplete > ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.d("solart", "onError > ${Thread.currentThread().name}") } })
这段代码中我们简单用了 create
、 flatMap
、 map
等操作符,进行了流式的数据转换,最后我们通过 subscribe
订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图
1.1 数据源的包裹
比照着这张图,我们来看一下,首先 蓝色虚线
部分是我们代码中实际调用的顺序,查看 Observable.create
我们不难发现,此处就是产生了一个 ObservableCreate
实例,
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
如我们图中所示, ObservableCreate
内部包含一个类型为 ObservableOnSubscribe<T>
的 source
变量,根据我们代码中的调用,这个 source
就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe<String>
。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { ... } ... }
我们顺着代码的调用顺序,继续看一下 flatMap
的方法中又做了什么:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize)); }
类似的产生了一个 ObservableFlatMap
实例,而其内部持有一个类型为 ObservableSource<T>
的 source
变量,而该 source 则是上一步中的 ObservableCreate
实例,依次我们看 map
依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容。
1.2 逆向订阅数据源
我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe
时(图中红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 重点!!! 发生订阅的核心方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... throw npe; } }
根据我们上面的分析,执行 subscribeActual
的对象其实是 ObservableMap
,我们来看它的 subscribeActual
的实现
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { ... @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } ... }
注意,此时产生了一个 MapObserver
对象, MapObserver
中通过 actual
持有了我们自己的匿名对象 object : Observer<Int>
,同样的, ObservableMap
执行 subscribeActual 又调用了上层的 source.subscribe
,依次逆向调用,就得到了我们图中上半部分的红线内容。
1.3 触发数据源产生原始数据,数据流转
当订阅发生在最顶层时,也就是 ObservableCreate
中的 subscribeActual
,此时触发了数据源的产生,通过 emitter
发射数据
public final class ObservableCreate<T> extends Observable<T> { ... @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); //此时触发了 onSubscribe 回调,这里先提一下 try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... }
而我们代码中此时产生了真正的数据
override fun subscribe(emitter: ObservableEmitter<String>) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() }
此时我们再来看 CreateEmitter
的实现:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { ... if (!isDisposed()) { observer.onNext(t); //向下层分发数据 } } ... }
根据我们上面的分析 CreateEmitter
中持有的 observer
即是 FlatMapObserver
的实例,而 FlatMapObserver
调用 onNext 时,又会调用 MapObserver
的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。
观察我们这个图,你会发现, 操作符
对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符
,例如 ObservableMap
= Observable + map,观察者命名大多遵循 操作符 + Observer
,例如 FlatMapObserver
= flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。
2、异步事件流编程(线程切换)
相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。
同样的我们还是先给出一个简单的示例:
Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() } }).subscribeOn(Schedulers.io()) .map(object : Function<String, Int> { override fun apply(t: String): Int { return 0 } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer<Int> { override fun onSubscribe(d: Disposable) { Log.d("solart", "onSubscribe > ${Thread.currentThread().name}") } override fun onNext(t: Int) { Log.d("solart", "onNext > ${Thread.currentThread().name}") } override fun onComplete() { Log.d("solart", "onComplete > ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.d("solart", "onError > ${Thread.currentThread().name}") } })
这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:
图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。
2.1 逆向订阅时触发 subscribeOn 的线程切换
根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。
我们来看订阅时(图中⑦的流程)切换线程的 ObservableSubscribeOn
的代码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } ... }
在逆向订阅的流程中,通过指定 Scheduler
将 SubscribeTask
任务交给线程池处理,我们先来看一下 SubscribeTask
的代码,就是执行了订阅:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); // 仅仅订阅了一下 } }
我们再来看 scheduler.scheduleDirect()
中是如何做到线程切换的:
public abstract class Scheduler { ... @NonNull public abstract Worker createWorker(); // 实现类中实现 ... @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); // 创建一个 worker final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); //执行任务 return task; } ... }
我们示例中是切换到了 io
线程,所以我们对应的看一下 IoScheduler
的部分代码:
public final class IoScheduler extends Scheduler { ... @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } ... static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } ... @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { ... // 此处粘贴了了父类中的实现 @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); ... Future<?> f; try { // 线程池执行任务 if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { ... } return sr; } } }
综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn
本身就是在 subscribeActual
中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生。这里还有一点要提一下, ObservableSubscribeOn
在执行 subscribeActual
时,回调了下层产生的 Observer
的 onSubscribe
,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会收到 onSubscribe
的回调,且在当前订阅时的线程中。
2.2 正向数据流触发 observerOn 的线程切换
同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver
的 onNext 处理的逻辑,也就是图中步骤⑬⑭:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; ... @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } ... static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ... final Observer<? super T> actual; final Scheduler.Worker worker; ... @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); //类似 ObservableSubscribeOn.subscribeActual() 异步线程执行 } ... void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } ... @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } ... } }
示例中我们此时切换到了 Main
线程中执行,我们来看对应的 HandlerScheduler
实现:
final class HandlerScheduler extends Scheduler { private final Handler handler; private final boolean async; ... @Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler, async); } private static final class HandlerWorker extends Worker { ... @Override @SuppressLint("NewApi") // Async will only be true when the API is available to call. public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ... run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } handler.sendMessageDelayed(message, unit.toMillis(delay)); ... return scheduled; } }
从代码中我们可以看到,此时将 Runnable
通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了上面我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。
总结
至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码流转,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现 异步事件流编程
,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。
另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK