3

Backpressure in Reactive Systems 响应式系统的反压

 2 years ago
source link: https://segmentfault.com/a/1190000041430830
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

原文
https://foojay.io/today/backp...

一月中,我基于我的文章迁移到Reactive的必要条件Spring Boot应用做了一个分享
https://www.youtube.com/watch...

因为那是一个Kotlin的聚会,我是用Kotlin代码展示的,同时我加了一个将代码库迁移到协程的步骤。

在QA环节,有人问到是否协程实现了反压。我承认我也不确定,所以我做了一点研究。

本文提供了关于反压的概要信息,还有如何用Rxjava(v3),Project Reactor和Kotlin的协程Coroutines如何处理。

什么是反压?

反压是指对管道中流体的抵御或反向作用力,导致丧失摩擦力和压力降低。反压的说法不太恰当,压力是个标量,有大小,但没有方向 -- 维基百科
在软件中,反压跟这有点关系但也有不同的含义:假设有一个很快的数据发送方和一个比较慢的数据接收方,反压是指一种机制可以反向推动发送方不要把接收方压垮。

无论是reactivestreams.org或java.until.concurrent.Flow,反应流都提供以下四个构建块

  • Publisher发送元素
  • Subscriber对收到的元素产生反应
  • 一个Subscription来绑定Publisher和Subscriber
  • 一个Processor
    这是类图:

8efe4a3fda4717284296805e5977286d.jpeg

Subscription的request()方法是反压的顶层。
规范很直白:

Subscriber必须通过Subscription.request(long n)来发送需求信号后接收onNext信号。这里隐含的规则就是由Subscriber决定什么时候和有多少元素需要被接收。为了避免可重入Subscription方法引起的信号重排序,强烈推荐Subscriber方法的实现在调用Subscription方法的最后对任何信号处理都是用同步的方式。推荐Subscriber请求它们可以处理的上限,因为一次只请求一个元素会导致低效的“停止和等待”协议。 -- JVM的Reactive流规范

响应流的规范很标准。它们也有基于Java的TCK。

但要定义如何管理producer发送下游无法处理的元素就超出这个规范的范围了。问题比较简单,解决方法也多。每种Reactive框架都有提供方案,我们来看下。

RxJava 3的反压

RxJava v3提供以下基础类:

类描述Flowable0到N号元素的流。支持Reactive-流和反压Observable0到N元素的流。不支持反压Single一个精确的流: 1个元素或一个错误Maybe一个包括以下的流: 没有元素 一个元素 或一个错误Completable一个流没有元素但: 是一个completion结束或一个错误的信号

在这些类中,Flowable是唯一实现了Reactive流-反压的流。因此,提供反压不是唯一的问题。RxJava wiki指出:

反压并没有解决Observable过度生成或Subscriber过度消费。它只是将这个问题从处理的链条中移动到了一个比较好处理的地方。 --响应式进行反压不是万金油。

为了解决这个,RxJava提供处理“过度生产“元素的两个主要策略:

  • 将元素存储到一个缓存里,如果没有足够的缓存,可能会产生OutOfMemoryError。
  • 丢掉数据
    下图描述了这些策略的不同实现方法:

aa5d2392aa01d082f792af66887abb1f.jpeg

记住onBackPressureLatest操作同使用onBackpressureBuffer(1)类似:

这张图来自RxJava的Wiki。

与其他框架不同的是,RxJava提供方法来在发送完所有元素后发送溢出异常信号。这让消费者可以收到数据而同时清楚发送方已经丢了数据。

Project Reactor中的反压

Project Reactor中提供的策略与RxJava类似。

API有点不一样。比如,如果生产者溢出Project Reactor提供一个方便的方法来抛异常:

var stream = Stream.generate(Math::random);

// RxJava
Flowable.fromStream(stream) // 1
.onBackpressureBuffer(0); // 2

// Project Reactor
Flux.fromStream(stream) // 1
.onBackpressureError(); // 2

  • 创建Reactive流
  • 如果生产者溢出抛异常

下面是高亮了反压能力的Flux类图:

45d029e6310717195f1425f800fe0eaf.jpeg

与其他框架相比,Project Reactor提供设置缓存TTL的方法来防止溢出。

协程中的反压

协程提供同样的缓存和失效能力。协程的基础类是Flow。

bd74a8c05f5372cca4d7cef0320dc4d4.jpeg

你可以这样使用:

flow { // 1
while (true) emit(Math.random()) // 2
}.buffer(10)

  • 建一个Flow类,由下面定义content
  • 定义Flow的内容
  • 设置缓存容量为10

RxJava,Project Reactor,Kotlin协程都提供反压能力。在生产者比消费者更快时提供两种策略:缓存数据或抛弃数据。

Reactive Streams JVM specifications
https://github.com/reactive-s...
How (not) to use Reactive Streams in Java 9+
https://blog.softwaremill.com...
RxJava Backpressure
https://github.com/ReactiveX/...


本文来自祝坤荣(时序)的微信公众号「麦芽面包」,公众号id「darkjune\_think」

开发者/科幻爱好者/硬核主机玩家/业余翻译
转载请注明。

B站: https://space.bilibili.com/23...
交流Email: [email protected]


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK