5

Project Reactor 核心原理解析

 2 years ago
source link: https://www.jianshu.com/p/df395eb28f69
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Project Reactor 核心原理解析

12019.03.10 10:25:44字数 1,877阅读 11,985

本文将解析 Spring 的 Reactor 项目的源码。主要目的是让自己能深入理解 Reactor 这个项目,以及 Spring 5 和 Spring Boot 2。

Project Reactor 项目地址:https://github.com/reactor

Reactor 项目主要包含 Reactor Core 和 Reactor Netty 两部分。Reactor Core 实现了反应式编程的核心功能,Reactor Netty 则是 Spring WebFlux 等技术的基础。本文将介绍 Core 模块的核心原理。

本文从一个例子开始:

public static void main(String[] args) {
  Flux.just("tom", "jack", "allen")
      .filter(s -> s.length() > 3)
      .map(s -> s.concat("@qq.com"))
      .subscribe(System.out::println);
}

整个 Project Reactor 的核心调用分为下面几个阶段:

  • subscribe 阶段
  • onSubscribe 阶段
  • request 阶段

接下来对每个阶段详细介绍。

二、声明阶段

之前的文章介绍过,反应式编程和传统风格的编程一个最大的不同点在于,开发人员编写的大部分代码都是在声明处理过程。即这些代码并不会被实际的执行,直到开始被订阅。这便是为何第一阶段是声明阶段的原因。

先来看第一个方法 Flux.just 的源码:

public static <T> Flux<T> just(T... data) {
  return fromArray(data);
}

public static <T> Flux<T> fromArray(T[] array) {
  // 省略不重要的部分
  return onAssembly(new FluxArray<>(array));
}

just 方法只是创建反应式流的众多方式的一个。在实际工作中,更常见的通过反应式 Repository 将数据库查询结果,或通过 Spring 5 的 WebClient 将 HTTP 调用结果最为流的开始。

onAssembly 是一个扩展机制,因为实例中并没有使用,所以可简单理解为将入参直接返回。

接下来 new FluxArray<>(array) 做的事情也很简单,仅仅是把入参赋给成员变量。

再接下来 filter(s -> s.length() > 3) 做的事情是把上一个 Flux,即 FluxArray,以及 s -> s.length() > 3 所表示的 Predicate 保存起来。

看一下源码

public final Flux<T> filter(Predicate<? super T> p) {
  if (this instanceof Fuseable) {
    return onAssembly(new FluxFilterFuseable<>(this, p));
  }
  return onAssembly(new FluxFilter<>(this, p));
}

FluxFilterFuseable(Flux<? extends T> source, 
                   Predicate<? super T> predicate) {
  super(source);
  this.predicate = 
      Objects.requireNonNull(predicate, "predicate");
}

这里有个逻辑判断,就是返回的值分为了 Fuseable 和非 Fuseable 两种。简单介绍一下,Fuseable 的意思就是可融合的。我理解就是 Flux 表示的是一个类似集合的概念,有一些集合类型可以将多个元素融合在一起,打包处理,而不必每个元素都一步步的处理,从而提高了效率。因为 FluxArray 中的数据一开始就都准备好了,因此可以打包处理,因此就是 Fuseable

而接下来的 map 操作也对应一个 FluxMapFusable。原理与 filter 操作,这里不再重复。

三、subscribe 阶段

subscribe 阶段同行会触发数据发送。在本例中,后面可以看到,对于 FluxArray,数据发送很简单,就是循环发送。而对于像数据库、RPC 这样的长久,则会触发请求的发送。

当调用 subscribe(System.out::println) 时,整个执行过程便进入 subscribe 阶段。经过一系列的调用之后,subscribe 动作会代理给具体的 Flux 来实现。就本例来说,会代理给 FluxMapFuseable,方法实现如下(经过简化):

public void subscribe(CoreSubscriber<? super R> actual) {
  source.subscribe(
      new MapFuseableSubscriber<>(actual, mapper)
  );
}

其中的 source 变量对应的是当前 Flux 的上一个。本例中,FluxMapFuseable 上一个是 FluxFilterFuseable

new MapFuseableSubscriber<>(actual, mapper) 则是将订阅了 FluxMapFuseable 的 Subscriber 和映射器封装在一起,组成一个新的 Subscriber。然后再订阅 source,即 FluxArraysource 是在上一个阶段被保存下来的。

这里强调一下 Publisher 接口中的 subscribe 方法语义上有些奇特,它表示的不是订阅关系,而是被订阅关系。即 aPublisher.subscribe(aSubscriber) 表示的是 aPublisheraSubscriber 订阅。

接下来调用的就是 FluxFilterFuseable 中的 subscribe 方法,类似于 FluxMapFuseable,源码如下:

public void subscribe(CoreSubscriber<? super T> actual) {
  source.subscribe(
      new FilterFuseableSubscriber<>(actual, predicate)
  );
}

这时 source 就成了 FluxArray。于是,接下来是调用 FluxArraysubscribe 方法。

public static <T> void subscribe(CoreSubscriber<? super T> s, 
                                 T[] array) {
  if (array.length == 0) {
    Operators.complete(s);
    return;
  }
  if (s instanceof ConditionalSubscriber) {
    s.onSubscribe(
      new ArrayConditionalSubscription<>(
        (ConditionalSubscriber<? super T>) s, array
      )
    );
  }
  else {
    s.onSubscribe(new ArraySubscription<>(s, array));
  }
}

最重要的部分还是 s.onSubscribe(new ArraySubscription<>(s, array))。不同于 FluxMapFuseableFluxFilterFuseableFluxArray 没有再调用 subscribe 方法,因为它是数据源头。而 FluxMapFuseableFluxFilterFuseable 则是中间过程。

可以这样简单理解,对于中间过程的 Mono/Flux,subscribe 阶段是订阅上一个 Mono/Flux;而对于源 Mono/Flux,则是要执行 Subscriber.onSubscribe(Subscription s) 方法。

四、onSubscribe 阶段

在调用 FluxArraysubscribe 方法之后,执行过程便进入了 onSubscribe 阶段。onSubscribe 阶段指的是 Subscriber#onSubscribe 方法被依次调用的阶段。这个阶段会让各 Subscriber 知道 subscribe 方法已被触发,真正的处理流程马上就要开始。所以这一阶段的工作相对简单。

s.onSubscribe(new ArraySubscription<>(s, array));

s 是 FilterFuseableSubscriber,看一下 FilterFuseableSubscriberonSubscribe(Subscription s) 源码:

public void onSubscribe(Subscription s) {
  if (Operators.validate(this.s, s)) {
    this.s = (QueueSubscription<T>) s;
    actual.onSubscribe(this);
  }
}

actual 对应 MapFuseableSubscriberMapFuseableSubscriberonSubscribe 方法也是这样,但 actual 对于的则是代表 System.out::printlnLambdaSubscriber

调用过程:

FluxArray.subscribe -> FilterFuseableSubscriber.onSubscribe -> MapFuseableSubscriber.onSubscribe -> LambdaSubscriber.onSubscribe

五、request 阶段

onSubscribe 阶段是表示订阅动作的方式,让各 Subscriber 知悉,准备开始处理数据。当最终的 Subscriber 做好处理数据的准备之后,它便会调用 Subscriptionrequest 方法请求数据。

下面是 LambdaSubscriber onSubscribe 方法的源码:

public final void onSubscribe(Subscription s) {
  if (Operators.validate(subscription, s)) {
    this.subscription = s;
    if (subscriptionConsumer != null) {
      try {
        subscriptionConsumer.accept(s);
      }
      catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        s.cancel();
        onError(t);
      }
    }
    else {
      s.request(Long.MAX_VALUE);
    }
  }
}

由上可见 request 方法被调用。在本例中,这里的 sMapFuseableSubscriber

这里需要说明,如 MapFuseableSubscriberFilterFuseableSubscriber,它们都有两个角色。一个角色是 Subscriber,另一个角色是 Subscription。因为它们都位于调用链的中间,本身并不产生数据,也不需要对数据暂存,但是需要对数据做各式处理。因此,在 onSubscribe、request 阶段,以及后面要讲到的调用阶段都需要起到代理的作用。这就解释了 actual.onSubscribe(this) onSubscribe 自己的原因。

下面是 FilterFuseableSubscriberrequest 方法的源码。充分可见其代理的角色。

public void request(long n) {
  s.request(n);
}

最后 ArraySubscriptionrequest 方法将被调用。

与 map、filter 等操作不同,flatMap 有比较大的差异,感兴趣的同学可以自己研究一下。本文先不详细介绍。

六、调用阶段

这一阶段将会通过调用 SubscriberonNext 方法,从而进行真正的反应式的数据处理。

ArraySubscriptionrequest 方法被调用之后,执行流程便开始了最后的调用阶段。

public void request(long n) {
  if (Operators.validate(n)) {
    if (Operators.addCap(REQUESTED, this, n) == 0) {
      if (n == Long.MAX_VALUE) {
        fastPath();
      }
      else {
        slowPath(n);
      }
    }
  }
}

void fastPath() {
  final T[] a = array;
  final int len = a.length;
  final Subscriber<? super T> s = actual;
  for (int i = index; i != len; i++) {
      if (cancelled) {
         return;
      }

      T t = a[i];

      if (t == null) {
      s.onError(new NullPointerException("The " 
          + i 
          + "th array element was null")
      );
      return;
    }

    s.onNext(t);
  }
  if (cancelled) {
    return;
  }
  s.onComplete();
}

ArraySubscription 会循环数据中的所有元素,然后调用 SubscriberonNext 方法,将元素交由 Subscriber 链处理。

于是接下来由 FilterFuseableSubscriber 处理。下面是其 onNext 方法(做了一些简化)

public void onNext(T t) {
    boolean b;

    try {
        b = predicate.test(t);
    }
    catch (Throwable e) {
        Throwable e_ = 
            Operators.onNextError(t, e, this.ctx, s);
        if (e_ != null) {
            onError(e_);
        }
        else {
            s.request(1);
        }
        Operators.onDiscard(t, this.ctx);
        return;
    }
    if (b) {
        actual.onNext(t);
    }
    else {
        s.request(1);
        Operators.onDiscard(t, this.ctx);
    }
}

其最要部分是通过 predicate 进行判断,如果满足条件,则交由下一个 Subscriber 处理。

下一个要处理的是 MapFuseableSubscriber,原理类似,不再重复。

最终要处理数据的是 LambdaSubscriber,下面是它的 onNext 方法源码:

public final void onNext(T x) {
  try {
    if (consumer != null) {
      consumer.accept(x);
    }
  }
  catch (Throwable t) {
    Exceptions.throwIfFatal(t);
    this.subscription.cancel();
    onError(t);
  }
}

consumer.accept(x) 会使数据打印在命令行中。

整个流程的解释到这里基本结束。

接下来我们用一个图来回顾一下整体的流程。

Reactor Core

需要感谢下面这篇文章,帮助我很好理解了 Project Reactor 的核心处理流程。

由表及里学 ProjectReactor:http://blog.yannxia.top/2018/06/26/java/spring/projectreactor/

我的技术公众号“编走编想”

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK