7

WebFlux 前置知识(四)

 3 years ago
source link: http://www.javaboy.org/2021/0601/webflux-publisher-subscriber.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
20 天前 15 分钟 读完 (大约 2298 个字)

WebFlux 前置知识(四)

[TOC]

1.Backpressure

Backpressure 在国内被翻译成背压,这个翻译在网上被很多人吐槽,我觉得大家的吐槽是有道理的,背压单纯从字面上确实看不出来有什么意思。所以松哥这里直接用英文 Backpressure 吧。

Backpressure 是一种现象:当数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure。

换句话说,上游生产数据,生产完成后通过管道将数据传到下游,下游消费数据,当下游消费速度小于上游数据生产速度时,数据在管道中积压会对上游形成一个压力,这就是 Backpressure,从这个角度来说,Backpressure 翻译成反压、回压似乎更合理一些。

Backpressure 会出现在有 Buffer 上限的系统中,当出现 Buffer 溢出的时候,就会有 Backpressure,对于 Backpressure,它的应对措施只有一个:丢弃新事件。那么什么是 Buffer 溢出呢?例如我的服务器可以同时处理 2000 个用户请求,那么我就把请求上限设置为 2000,这个 2000 就是我的 Buffer,当超出 2000 的时候,就产生了 Backpressure。

2.Flow API

JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。

在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber,Subscriber 接收 Publisher 发布的数据并进行消费,在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理。

JDK9 中提供了 Flow API 用以支持响应式编程,另外 RxJava 和 Reactor 等框架也提供了相关的实现。

我们来看看 JDK9 中的 Flow 类:

非常简洁,基本上就是按照 Reactive Programming 的设计来的:

Publisher

Publisher 为数据发布者,这是一个函数式接口,里边只有一个方法,通过这个方法将数据发布出去,Publisher 的定义如下:

@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}

Subscriber

Subscriber 为数据订阅者,这个里边有四个方法,如下:

public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
  • onSubscribe:这个是订阅成功的回调方法,用于初始化 Subscription,并且表明可以开始接收订阅数据了。
  • onNext:接收下一项订阅数据的回调方法。
  • onError:在 Publisher 或 Subcriber 遇到不可恢复的错误时调用此方法,之后 Subscription 不会再调用 Subscriber 其他的方法。
  • onComplete:当接收完所有订阅数据,并且发布者已经关闭后会回调这个方法。

Subscription

Subscription 为发布者和订阅者之间的订阅关系,用来控制消息的消费,这个里边有两个方法:

public static interface Subscription {
public void request(long n);
public void cancel();
}
  • request:这个方法用来向数据发布者请求 n 个数据。
  • cancel:取消消息订阅,订阅者将不再接收数据。

Processor

Processor 是一个空接口,不过它同时继承了 Publisher 和 Subscriber,所以它既能发布数据也能订阅数据,因此我们可以通过 Processor 来完成一些数据转换的功能,先接收数据进行处理,处理完成后再将数据发布出去,这个也有点类似于我们 JavaEE 中的过滤器。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

2.1 消息订阅初体验

我们通过如下一段代码体验一下消息的订阅与发布:

public class FlowDemo {
public static void main(String[] args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
//向数据发布者请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("接收到 publisher 发来的消息了:" + item);
//接收完成后,可以继续接收或者不接收
//this.subscription.cancel();
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
//出现异常,就会来到这个方法,此时直接取消订阅即可
this.subscription.cancel();
}
@Override
public void onComplete() {
//发布者的所有数据都被接收,并且发布者已经关闭
System.out.println("数据接收完毕");
}
};
//配置发布者和订阅者
publisher.subscribe(subscriber);
for (int i = 0; i < 5; i++) {
//发送数据
publisher.submit("hello:" + i);
}
//关闭发布者
publisher.close();
new Scanner(System.in).next();
}
}

松哥稍微解释一下上面这段代码:

  1. 首先创建一个 SubmissionPublisher 对象作为消息发布者。
  2. 接下来创建 Flow.Subscriber 对象作为消息订阅者,实现消息订阅者里边的四个方法,分别进行处理。
  3. 为 publisher 配置上 subscriber。
  4. 发送消息。
  5. 消息发送完成后关闭 publisher。
  6. 最后是让程序不要停止,观察消息订阅者打印情况。

2.2 模拟 Backpressure

Backpressure 问题在 Flow API 中得到了很好的解决。Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,相关源码如下:

public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
...
}

一旦超出这个数据量,publisher 就会降低数据发送速度。

我们对上面的案例进行修改,如下:

public class FlowDemo {
public static void main(String[] args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
//向数据发布者请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
System.out.println("接收到 publisher 发来的消息了:" + item);
//接收完成后,可以继续接收或者不接收
//this.subscription.cancel();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
//出现异常,就会来到这个方法,此时直接取消订阅即可
this.subscription.cancel();
}

@Override
public void onComplete() {
//发布者的所有数据都被接收,并且发布者已经关闭
System.out.println("数据接收完毕");
}
};
publisher.subscribe(subscriber);
for (int i = 0; i < 500; i++) {
System.out.println("i--------->" + i);
publisher.submit("hello:" + i);
}
//关闭发布者
publisher.close();
new Scanner(System.in).next();
}
}

一共修改了三个地方:

  1. Subscriber#onNext 方法中,每次休息两秒再处理下一条数据。
  2. 发布数据时,一共发布 500 条数据。
  3. 打印数据发布的日志。

修改完成后,我们再次启动项目,观察控制台输出:

可以看到,生产者先是一股脑生产了 257 条数据(hello0 在一开始就被消费了,所以缓存中实际上是 256 条),消息则是一条一条的来,由于消费的速度比较慢,所以当缓存中的数据超过 256 条之后,接下来都是消费一条,再发送一条。

2.3 数据处理

Flow.Processor 可以像过滤器一样,对数据进行预处理,数据从 publisher 出来之后,先进入 Flow.Processor 中进行预处理,然后再进入 Subscriber。

修改后的代码如下:

public class FlowDemo {
public static void main(String[] args) {

class DataFilter extends SubmissionPublisher<String> implements Flow.Processor<String,String>{

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(String item) {
this.submit("【这是一条被处理过的数据】" + item);
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
this.subscription.cancel();
}

@Override
public void onComplete() {
this.close();
}
}

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
DataFilter dataFilter = new DataFilter();
publisher.subscribe(dataFilter);

Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
//向数据发布者请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
System.out.println("接收到 publisher 发来的消息了:" + item);
//接收完成后,可以继续接收或者不接收
//this.subscription.cancel();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
//出现异常,就会来到这个方法,此时直接取消订阅即可
this.subscription.cancel();
}

@Override
public void onComplete() {
//发布者的所有数据都被接收,并且发布者已经关闭
System.out.println("数据接收完毕");
}
};
dataFilter.subscribe(subscriber);
for (int i = 0; i < 500; i++) {
System.out.println("发送消息 i--------->" + i);
publisher.submit("hello:" + i);
}
//关闭发布者
publisher.close();
new Scanner(System.in).next();
}
}

简单起见,我这里创建了一个局部内部类 DataFilter,DataFilter 继承自 SubmissionPublisher 并实现了 Flow.Processor 接口,由于 DataFilter 继承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。

在 DataFilter 中完成消息的处理并重新发送出去。接下来定义 publisher,让 dataFilter 作为其订阅者,再定义新的订阅者,作为 dataFilter 的订阅者。

最终运行效果如下:

好啦,这就是今天和大家介绍的 Java9 中的 Reactive Stream,那么至此,我们的 WebFlux 前置知识差不多告一段落了,下篇文章开始,正式开整 WebFlux。

喜欢这篇文章吗?扫码关注公众号【江南一点雨】【江南一点雨】专注于 SPRING BOOT+微服务以及前后端分离技术,每天推送原创技术干货,关注后回复 JAVA,领取松哥为你精心准备的 JAVA 干货!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK