3

系统学习Rxjava

 2 years ago
source link: https://dcbupt.github.io/2021/05/17/FarBox/Forwarddc/%E7%B3%BB%E7%BB%9F%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97/%E7%B3%BB%E7%BB%9F%E5%AD%A6%E4%B9%A0Rxjava/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

https://www.jianshu.com/p/88aacbed8aa5

以下介绍基于Rxjava2

reactive是什么

一种基于观察者模式的响应式编程范式。

{待补充}

先看看基于Rxjava2的reactive编程最简demo:

Observable.create(new ObservableOnSubscribe<Object>() {

@Override
public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {

}

}).subscribe(new Observer<Object>() {

@Override
public void onNext(@NonNull Object s) {
//订阅业务代码
}

@Override
public void onSubscribe(Disposable disposable) {

}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}
});

可以看到,是一种链式调用的写法

Observable.create会返回一个ObservableCreate,他本身也是一个Observable。

Observable是一个模板类,首先看一下Observable的subscribe方法,这是真正执行消费逻辑的起点。subscribeActual是模板方法。

@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");

try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}

ObservableCreate的subscribeActual方法实现

protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);

try {
this.source.subscribe(parent);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}

}

subscribeActual方法做了这几件事:

  • Observer包装成CreateEmitter,
  • Observer和Emitter彼此持有对方的引用
  • 调用ObservableOnSubscribe的subscribe方法(ObservableCreate构造方法里注入source,类型是ObservableOnSubscribe)

Observer包装成Emitter –>
ObservableOnSubscribe生产事件,调用Emitter的onNext方法将事件传递给订阅者 –>
Emitter执行一些框架自身的逻辑功能(比如流控),然后调用Observer的onNext方法将事件传递回Observer –>
执行Observer的onNext方法

Observer –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> Observer

  • Rxjava框架提供核心类:ObservableCreate。核心方法:subscribeActual。
  • ObservableCreate负责将事件源的生产与消费串联起来。当然,还有负责事件源类型映射的ObservableMap等。
  • ObservableCreate拿到业务方的Observer实现类,包装成CreateEmitter(RxJava提供的事件发送器)传递给事件源生产者ObservableOnSubscribe。
  • ObservableCreate注入业务方的ObservableOnSubscribe实现类,调用其subscribe方法执行“事件源的生产逻辑”。方法内部逻辑:生产事件源Object,通过Emitter向下传递到Observer,执行消费逻辑。

所以使用reactive编程的基本模式:

  • 自定义Observer(负责事件源的消费)
  • 实现ObservableOnSubscribe接口的subscribe方法(负责事件源的生产)

加工“事件源”

先看下写法:

Observable.create(new ObservableOnSubscribe<Object>() {            
@Override
public void call(@NonNull Observer<Object> e) throws Exception {
// 生产并发送数据源
}
}).map(new Function<Object, String>() {
@Override
public String apply(@NonNull Object o) throws Exception {
// 数据源类型转换
// return "Obj";
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(@NonNull String s) {
//订阅业务代码
}
});

Observable可以调用map\filter等方法对事件源进行加工

Observable调用create创建ObservableCreate,调用map创建ObservableMap

下面是ObservableMap的subscribeActual方法

public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}

ObservableMap把Observer包装为MapObserver,Function是业务方自定义的映射函数,然后调用ObservableCreate的subscribe方法执行“事件源的生产与发送”。对于ObservableMap,source就是级联调用上一级的Observable实现类,这个demo里就是ObservableCreate

MapObserver的“发送数据源”onNext方法

public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 0) {
this.downstream.onNext((Object)null);
} else {
Object v;
try {
v = ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable var4) {
this.fail(var4);
return;
}

this.downstream.onNext(v);
}
}
}

downstream是业务方实现类Observer的引用,mapper是业务方定义的映射函数,这两个参数均在构造函数里指定了。

上层(ObservableCreate)调用onNext生产并发送数据后,调用下层MapObserver的onNext方法。内部逻辑:执行映射函数完成数据源类型转换,最后交给业务方实现的Observer消费数据(onNext方法)。

Observer –(包装)–> MapObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> MapObserver –(回调)–> Observer

前半程一层层包装Observer,后半程从顶到底逐层回调每一层Observer包装类的onNext方法消费数据源,直至调到最底层业务方Observer实现类的onNext方法。

  • ObservableCreate负责生产和发送数据。生产数据的逻辑由业务方在ObservableOnSubscribe的实现类里控制,发送数据的机制由RxJava框架提供的CreateEmitter负责。
  • ObservableMap负责完成数据类型的映射。
  • 最后执行业务方Observer的onNext方法消费数据。

看到这,基本能看出reactive在实现生产-消费模型的一般思路:

ObservableCreate->ObservableMap->ObservableXXX。箭头左边的是右边的parent,右边的ObservableXXX使用一些包装类例如MapObserver、CreateEmitter包装Observer,指定从数据生产后到消费前的处理逻辑。ObservableCreate最终拿到Observer的包装类CreateEmitter,发送数据

所谓流控,在Rxjava架构下,就是消费端(下游)Observer可以决定何时终止对事件源生产端(上游)的消费。

上下游的这种流控通过Disposable实现。

看下Observer的定义:

public interface Observer<T> {
void onSubscribe(@NonNull Disposable var1);

void onNext(@NonNull T var1);

void onError(@NonNull Throwable var1);

void onComplete();
}

Observer的onSubscribe维护一个上游事件生产端创建的Disposable引用,执行dispose方法来终止对上游的消费。

demo:

public void demo8() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("发送" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
private Disposable disposable;

@Override
public void onSubscribe(Disposable d) {
disposable = d;
}

@Override
public void onNext(Integer integer) {
System.out.println("接收" + integer);
if (integer > 4) disposable.dispose();
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("数据接受完成");
}
});
}

那么,Disposable是何时创建并注入Observer的?

上面提到,ObservableCreate会把Observer包装成一个发射器CreateEmitter,代码如下:

protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);

try {
this.source.subscribe(parent);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}

}

消费者Observer不持有生产者Observable,而是持有Emitter。生产者Observable也是持有Emitter,可以理解Emitter是生产消费者之间的桥梁。生产者产出的数据通过Emitter传递给消费者,所以Emitter可以承担流控的职责。

CreateEmitter其实是Disposable的一个实现类,Disposable是一次性的意思。流控时,可以理解为消费者不再需要消费了,那么这个Emitter发送器就没有价值了,即它是一次性的,用后即丢。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
...
}

Observer想终止消费时,调用Disposable的dispose方法,实际会执行CreateEmiter的dispose方法,将发射器设置为已废弃

public void dispose() {
DisposableHelper.dispose(this);
}

再看下CreateEmitter的onNext方法

public void onNext(T t) {
if (t == null) {
this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
if (!this.isDisposed()) {
this.observer.onNext(t);
}

}
}

因此,​下游Observer调用dispose方法废弃发射器后,发射器不再回调observer的onNext来消费,达到流控的效果

  • CreateEmitter(一个Disposable实现类)和Observer互相持有彼此的引用
  • CreateEmitter持有Observer来回调下游,执行事件源消费链路
  • Observer持有CreateEmitter,控制上游是否回调下游进行消费,达到流控的目的。

线程切换有两种方法:subscribeOnobserveOn

demo:

public static void test1() {

Observable.create(new ObservableOnSubscribe<String>() {

@Override
public void subscribe(ObservableEmitter<String> observableEmitter) {
System.out.println("准备");
System.out.println(Thread.currentThread().getName());

for (int i = 0; i < 5; i++) {
//Thread.sleep(1000);
System.out.println("发射" + " " + Thread.currentThread().getName());
observableEmitter.onNext("123");
}
observableEmitter.onComplete();
}

}).
subscribeOn(Schedulers.computation()).
subscribeOn(Schedulers.io()).
observeOn(Schedulers.single()).
map(new Function<String, String>() {

@Override
public String apply(@NonNull String o) throws Exception {
//转换业务代码
System.out.println("convert" + Thread.currentThread().getName());
return "Obj";
}
}).
observeOn(Schedulers.io()).
subscribe(new Observer<String>() {

@Override
public void onNext(@NonNull String s) {
//订阅业务代码
System.out.println("接收" + Thread.currentThread().getName());
}

@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}

@Override
public void onError(Throwable throwable) {
System.out.println("onError" + throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}

先说结论:

  • subscribeOn指定上游数据处理执行的线程。上游是相对subscribeOn调用处而言。以上面的例子说明,上游是数据生成和发送。每次调用subscribeOn都对上游生效。如果连续多次调用,第一次调用指定的线程生效
  • observeOn指定下游数据消费的线程。下游是相对subscribeOn调用处而言。以上面的例子说明,下游是数据Map映射、消费数据。每次调用observeOn都对下游生效。如果连续多次调用,最后一次调用指定的线程生效

因此,上面的demo中,数据生产和发送运行在computation线程,数据类型转换运行在single线程,消费端运行在io线程

subscribeOn

看下subscribeOn方法的实现

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

和map方法类似,也是返回一个Observable,看下它的subscribeActual方法

public void subscribeActual(Observer<? super T> observer) {
ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(observer);
observer.onSubscribe(parent);
parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent)));
}

也是包装了下层的Observer,不过这个SubscribeOnObserver包装类并没有对事件源做其他操作,直接调用了下游observer的onNext方法,这里就不贴代码了。

​重点看下subscribeTask这个任务做了啥

final class SubscribeTask implements Runnable {
private final ObservableSubscribeOn.SubscribeOnObserver<T> parent;

SubscribeTask(ObservableSubscribeOn.SubscribeOnObserver<T> parent) {
this.parent = parent;
}

public void run() {
ObservableSubscribeOn.this.source.subscribe(this.parent);
}
}

很简单,继续向上调用上层Observable的subscribe方法,重点是使得上游的代码都运行在scheduler指定的线程里

Observer –(包装)–> SubscribeOnObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> SubscribeOnObserver –(回调)–> Observer

虽然SubscribeOnObserver没有对Observer做额外处理,但继续调用上层source.subscribe方法时,已经切换到scheduler定义的线程了。因此,上游的所有操作,都会在subscribeOn方法指定的scheduler线程里执行。​​由此可知,如果连续多次调用subscribeOn指定线程,只有第一次起作用

observeOn

observeOn和subscribeOn的思路基本一致,这里只介绍差异。

ObserveOnObserver是observeOn方法执行后,对observer的包装类,看下它的onNext方法

public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 2) {
this.queue.offer(t);
}

this.schedule();
}
}

schedule方法的实现:

void schedule() {
if (this.getAndIncrement() == 0) {
this.worker.schedule(this);
}

}

this指代ObserveOnObserver,它是一个runnable,run方法里会调用下游observer的onNext方法。因此,下游observer的消费代码运行在observeOn方法指定的scheduler线程里。

Flowable背压

Flowable是Rxjava2较Rxjava的主要更新,可以把它看做是Observable+背压处理

那么,什么是背压呢?

生产速度>消费速度时,未被及时消费的对象堆积在内存中,就产生了所谓的背压

在Reactive编程模型中,只有在生产端和消费端运行在不同线程,且生产速度>消费速度时,才会出现背压的情况。

需要说明的是,尽管Flowable支持背压,但也牺牲了一些性能,所以除非满足背压的场景,否则还是推荐使用Observable。

另外,Flowable使用了另一套体系(Publisher-Subscriber),与Observable体系(ObservableSource-Observer)的关系可以简单理解为:

  • Flowable(Publisher实现类) <–> Observable(ObservableSource实现类)
  • Subscriber <–> Observer

看下背压的基本写法:

public void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
System.out.println("发射----> 1");
e.onNext(1);
System.out.println("发射----> 2");
e.onNext(2);
System.out.println("发射----> 3");
e.onNext(3);
System.out.println("发射----> 完成");
e.onComplete();
}
}, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
.subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}

@Override
public void onNext(Integer integer) {
System.out.println("接收----> " + integer);
}

@Override
public void onError(Throwable t) {
}

@Override
public void onComplete() {
System.out.println("接收----> 完成");
}
});
}

Flow.create会返回一个FlowableCreate,和ObservableCreate类似,看下它的subscribeActual方法

public void subscribeActual(Subscriber<? super T> t) {
Object emitter;
switch(this.backpressure) {
case MISSING:
emitter = new FlowableCreate.MissingEmitter(t);
break;
case ERROR:
emitter = new FlowableCreate.ErrorAsyncEmitter(t);
break;
case DROP:
emitter = new FlowableCreate.DropAsyncEmitter(t);
break;
case LATEST:
emitter = new FlowableCreate.LatestAsyncEmitter(t);
break;
default:
emitter = new FlowableCreate.BufferAsyncEmitter(t, bufferSize());
}

t.onSubscribe((Subscription)emitter);

try {
this.source.subscribe((FlowableEmitter)emitter);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
((FlowableCreate.BaseEmitter)emitter).onError(var4);
}

}

与ObservableCreate不同的是,这里基于入参指定的背压策略枚举,创建对应的发射器Emitter。

这些Emitter对于背压有不同的处理策略,具体体现在onNext实现的差异上。

但这些Emitter也有一些共同逻辑:Emitter会维护一个初始值为128的Long型原子类。每次向下游发送一次数据,该值自减1。如果减到0,基于不同的Emitter实现类,执行不同策略:丢弃 or 忽略(指忽略背压,继续生产数据,可能产生OOM) or 调用subscriber的onError 等等

Flowable体系的Emitter发送数据和Observable体系的Emitter不同,并非依次回调下游的onNext方法,他回调下游ObserveOnSubscriber的onNext,将数据放入下游ObserveOnSubscriber的异步缓存池内(本质是一个队列Queue),然后尝试启动ObserveOnSubscriber的异步线程就结束了

ObserveOnSubscriber在异步线程里执行runAsync方法,如果已消费的数据量小于消费者subscriber指定的阈值,则从异步缓存池里取数据后回调消费者subscriber.onNext方法消费。当下游消费了96个数据对象后,Emitter的原子Long当前值value+96,即已消费的额度回补给发端

消费者subscriber在onSubscribe方法里设置消费数据个数,该值维护在ObserveOnSubscriber,一旦消费数达到了阈值,ObserveOnSubscriber在异步消费线程里不会回调下游subscriber.onNext方法消费

@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}

总结一下:

生产出来的数据,会缓存到异步缓存池中。上游发送数据前,先判断原子Long的value,如果为0,执行背压策略,否则发送数据。然后CAS操作value-1。当下游消费了96个数据对象后,回补额度,Emitter的原子Long当前value+96。

这种设计允许下游消费速度在一定限度内比上游生产速度慢。只要生产128个数据的时间内,能消费96个数据,就能回补已消费数据额度用于再生产。否则就要执行相应的背压策略。

打个比方:女神一开始给屌丝128的好感度,好友度随女神等级不同,下跌速度不同。在好感度跌0前,如果屌丝送了超过96次礼物,回补96好感度,并重置礼物次数。如果好感度跌0,分手。。。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK